2028 lines
78 KiB
Python
2028 lines
78 KiB
Python
import tkinter as tk
|
|
from tkinter import ttk, messagebox
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import re
|
|
import ctypes
|
|
import sys
|
|
import ipaddress
|
|
from typing import List, Optional, Dict
|
|
from dataclasses import dataclass
|
|
import threading
|
|
import time
|
|
import socket
|
|
import queue
|
|
import struct
|
|
|
|
# Add libraries for ping and SNMP functionality
|
|
try:
|
|
from pythonping import ping as pythonping
|
|
|
|
PING_LIBRARY_AVAILABLE = True
|
|
except ImportError:
|
|
PING_LIBRARY_AVAILABLE = False
|
|
print(
|
|
"PythonPing module not found. Install with 'pip install pythonping' for better ping functionality."
|
|
)
|
|
|
|
try:
|
|
from pysnmp.hlapi import *
|
|
|
|
SNMP_AVAILABLE = True
|
|
except ImportError:
|
|
SNMP_AVAILABLE = False
|
|
print(
|
|
"pysnmp module not found. Install with 'pip install pysnmp' for SNMP functionality."
|
|
)
|
|
|
|
try:
|
|
import wmi
|
|
|
|
WMI_AVAILABLE = True
|
|
except ImportError:
|
|
WMI_AVAILABLE = False
|
|
print(
|
|
"WMI module not found. Using alternative method for network interface detection."
|
|
)
|
|
|
|
# Add MAC vendor lookup library
|
|
try:
|
|
from mac_vendor_lookup import MacLookup
|
|
|
|
MAC_LOOKUP_AVAILABLE = True
|
|
# Initialize the MAC lookup service
|
|
mac_lookup = MacLookup()
|
|
# Update the database if needed (this can be time-consuming)
|
|
# mac_lookup.update_vendors() # Uncomment to update database on startup
|
|
except ImportError:
|
|
MAC_LOOKUP_AVAILABLE = False
|
|
print(
|
|
"mac-vendor-lookup module not found. Install with 'pip install mac-vendor-lookup' for MAC vendor lookup functionality."
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class NetworkInterface:
|
|
name: str
|
|
description: str
|
|
ip_address: str = ""
|
|
subnet_mask: str = ""
|
|
gateway: str = ""
|
|
dhcp_enabled: bool = True
|
|
active: bool = True
|
|
adapter_id: str = ""
|
|
|
|
|
|
class IPChangeConfig:
|
|
def __init__(self):
|
|
self.config_file = "ip_config.json"
|
|
self.history_file = "ip_history.json"
|
|
self.ping_targets_file = "ping_targets.json" # New file for ping targets
|
|
self.load_config()
|
|
self.load_ping_targets() # Load ping targets
|
|
|
|
def load_config(self):
|
|
try:
|
|
if os.path.exists(self.config_file):
|
|
with open(self.config_file, "r") as f:
|
|
config = json.load(f)
|
|
self.last_interface = config.get("last_interface", "")
|
|
self.last_ip_prefix = config.get("last_ip_prefix", "")
|
|
self.previous_config = config.get("previous_config", {})
|
|
else:
|
|
self.last_interface = ""
|
|
self.last_ip_prefix = ""
|
|
self.previous_config = {}
|
|
except Exception as e:
|
|
print(f"Error loading config: {e}")
|
|
self.last_interface = ""
|
|
self.last_ip_prefix = ""
|
|
self.previous_config = {}
|
|
|
|
def save_config(self):
|
|
try:
|
|
config = {
|
|
"last_interface": self.last_interface,
|
|
"last_ip_prefix": self.last_ip_prefix,
|
|
"previous_config": self.previous_config,
|
|
}
|
|
with open(self.config_file, "w") as f:
|
|
json.dump(config, f)
|
|
except Exception as e:
|
|
print(f"Error saving config: {e}")
|
|
|
|
def load_ping_targets(self):
|
|
"""Load saved ping targets for each IP prefix"""
|
|
try:
|
|
if os.path.exists(self.ping_targets_file):
|
|
with open(self.ping_targets_file, "r") as f:
|
|
self.ping_targets = json.load(f)
|
|
else:
|
|
self.ping_targets = {} # Dictionary to store ping targets by IP prefix
|
|
except Exception as e:
|
|
print(f"Error loading ping targets: {e}")
|
|
self.ping_targets = {}
|
|
|
|
def save_ping_targets(self):
|
|
"""Save ping targets for each IP prefix"""
|
|
try:
|
|
with open(self.ping_targets_file, "w") as f:
|
|
json.dump(self.ping_targets, f)
|
|
except Exception as e:
|
|
print(f"Error saving ping targets: {e}")
|
|
|
|
def get_ping_target(self, ip_prefix):
|
|
"""Get the saved ping target for the given IP prefix"""
|
|
if not ip_prefix:
|
|
return ""
|
|
|
|
# Try to find exact match first
|
|
if ip_prefix in self.ping_targets:
|
|
return self.ping_targets[ip_prefix]
|
|
|
|
# If no exact match, try to find a matching IP prefix
|
|
for prefix, target in self.ping_targets.items():
|
|
if ip_prefix.startswith(prefix) or prefix.startswith(ip_prefix):
|
|
return target
|
|
|
|
# Default target: IP prefix + ".1" (usually the gateway)
|
|
return f"{ip_prefix}.1"
|
|
|
|
def set_ping_target(self, ip_prefix, target):
|
|
"""Save a ping target for the given IP prefix"""
|
|
if ip_prefix and target:
|
|
self.ping_targets[ip_prefix] = target
|
|
self.save_ping_targets()
|
|
|
|
|
|
class IPChangerApp:
|
|
def __init__(self, master):
|
|
self.master = master
|
|
self.master.title("Network Interface IP Configuration")
|
|
self.master.geometry("900x700")
|
|
|
|
# Inicializar configuración antes que nada
|
|
self.config = IPChangeConfig()
|
|
|
|
# Inicializar estructuras de datos
|
|
self.interfaces: List[NetworkInterface] = []
|
|
self.ip_history: List[str] = []
|
|
|
|
# Variables for continuous ping
|
|
self.continuous_ping = tk.BooleanVar(value=False)
|
|
self.ping_running = False
|
|
self.ping_stop_event = threading.Event()
|
|
self.last_ping_time = tk.StringVar(
|
|
value="N/A"
|
|
) # Add variable for last ping time
|
|
|
|
# Variables for IP scanning
|
|
self.scan_results = []
|
|
self.scan_running = False
|
|
self.scan_stop_event = threading.Event()
|
|
self.scan_queue = queue.Queue()
|
|
self.scan_threads = []
|
|
self.scan_progress_var = tk.IntVar(value=0)
|
|
|
|
# Inicializar WMI si está disponible
|
|
if WMI_AVAILABLE:
|
|
self.wmi_client = wmi.WMI()
|
|
|
|
# Cargar datos guardados antes de crear widgets
|
|
self.load_ip_history()
|
|
|
|
# Crear la interfaz con pestañas
|
|
self.create_widgets()
|
|
|
|
# Initialize trace variables
|
|
self.subnet_trace_id = None
|
|
self.cidr_trace_id = None
|
|
|
|
# Set up initial traces safely
|
|
self.setup_mask_traces()
|
|
|
|
# Actualizar la lista de IPs en el combo
|
|
self.update_history_display()
|
|
|
|
# Refrescar interfaces
|
|
self.refresh_interfaces()
|
|
|
|
# Configurar pesos de la cuadrícula
|
|
self.master.grid_columnconfigure(0, weight=1)
|
|
self.master.grid_rowconfigure(1, weight=1) # El log expandible
|
|
|
|
# Initialize MAC lookup if available
|
|
if MAC_LOOKUP_AVAILABLE:
|
|
self.mac_lookup = MacLookup()
|
|
|
|
def setup_mask_traces(self):
|
|
"""Set up traces for subnet mask and CIDR fields safely"""
|
|
# First initialize variables
|
|
self.subnet_mask.set(self.subnet_mask.get()) # Ensure current value is valid
|
|
self.cidr_bits.set(self.cidr_bits.get()) # Ensure current value is valid
|
|
|
|
# Add traces using a different approach - using add not trace_add
|
|
self.subnet_mask.trace("w", self.on_subnet_mask_changed)
|
|
self.cidr_bits.trace("w", self.on_cidr_bits_changed)
|
|
|
|
self.log_message("Subnet mask traces initialized")
|
|
|
|
def remove_mask_traces(self):
|
|
"""Not needed - we'll use a different approach"""
|
|
pass # Intentionally empty
|
|
|
|
def get_ip_prefix(self, ip: str) -> str:
|
|
"""Extrae los primeros 3 octetos de una dirección IP"""
|
|
parts = ip.split(".")
|
|
if len(parts) >= 3:
|
|
return ".".join(parts[:3])
|
|
return ip
|
|
|
|
def create_widgets(self):
|
|
# Create a notebook (tabbed interface)
|
|
self.notebook = ttk.Notebook(self.master)
|
|
self.notebook.grid(row=0, column=0, sticky="nsew", padx=10, pady=10)
|
|
|
|
# Create frames for each tab
|
|
self.ip_setup_tab = ttk.Frame(self.notebook, padding="10")
|
|
self.tools_tab = ttk.Frame(self.notebook, padding="10")
|
|
|
|
# Configure tab frames to expand
|
|
self.ip_setup_tab.columnconfigure(0, weight=1)
|
|
self.tools_tab.columnconfigure(0, weight=1)
|
|
|
|
# Add tabs to the notebook
|
|
self.notebook.add(self.ip_setup_tab, text="IP Setup")
|
|
self.notebook.add(self.tools_tab, text="Tools")
|
|
|
|
# Create IP Setup tab content (moved from original layout)
|
|
self.create_ip_setup_tab()
|
|
|
|
# Create Tools tab content
|
|
self.create_tools_tab()
|
|
|
|
# Log frame - shared between tabs
|
|
self.log_frame = ttk.LabelFrame(self.master, text="Operation Log", padding="5")
|
|
self.log_frame.grid(row=1, column=0, sticky="nsew", padx=10, pady=(0, 10))
|
|
|
|
# Use a better font and enable tag configuration
|
|
log_font = (
|
|
"Consolas",
|
|
10,
|
|
) # Use a monospaced font like Consolas or Courier New
|
|
self.log_text = tk.Text(self.log_frame, wrap="none", height=10, font=log_font)
|
|
|
|
# Create tags for different message types with specific formatting
|
|
self.log_text.tag_configure("INFO", foreground="blue")
|
|
self.log_text.tag_configure(
|
|
"ERROR", foreground="red", font=(log_font[0], log_font[1], "bold")
|
|
)
|
|
self.log_text.tag_configure(
|
|
"SUCCESS", foreground="green", font=(log_font[0], log_font[1], "bold")
|
|
)
|
|
self.log_text.tag_configure("WARN", foreground="orange")
|
|
self.log_text.tag_configure("PING", foreground="purple")
|
|
self.log_text.tag_configure(
|
|
"DISCOVERY", foreground="navy", font=(log_font[0], log_font[1], "bold")
|
|
)
|
|
self.log_text.tag_configure("HOSTNAME", foreground="teal")
|
|
self.log_text.tag_configure("MAC", foreground="maroon")
|
|
self.log_text.tag_configure("COMMAND", foreground="sienna")
|
|
self.log_text.tag_configure("VENDOR", foreground="#8B008B") # Dark magenta
|
|
|
|
# Continue with existing scrollbar setup
|
|
self.log_scrollbar_y = ttk.Scrollbar(
|
|
self.log_frame, orient="vertical", command=self.log_text.yview
|
|
)
|
|
self.log_scrollbar_x = ttk.Scrollbar(
|
|
self.log_frame, orient="horizontal", command=self.log_text.xview
|
|
)
|
|
|
|
self.log_text.configure(
|
|
yscrollcommand=self.log_scrollbar_y.set,
|
|
xscrollcommand=self.log_scrollbar_x.set,
|
|
)
|
|
|
|
self.log_text.grid(row=0, column=0, sticky="nsew")
|
|
self.log_scrollbar_y.grid(row=0, column=1, sticky="ns")
|
|
self.log_scrollbar_x.grid(row=1, column=0, sticky="ew")
|
|
|
|
ttk.Button(self.log_frame, text="Clear Log", command=self.clear_log).grid(
|
|
row=2, column=0, columnspan=2, pady=5
|
|
)
|
|
|
|
self.log_frame.grid_columnconfigure(0, weight=1)
|
|
self.log_frame.grid_rowconfigure(0, weight=1)
|
|
|
|
def create_ip_setup_tab(self):
|
|
# Control frame for IP Setup tab
|
|
self.control_frame = ttk.Frame(self.ip_setup_tab)
|
|
self.control_frame.grid(row=0, column=0, sticky="nsew")
|
|
self.control_frame.columnconfigure(0, weight=1)
|
|
|
|
# Sección de interfaces
|
|
self.interfaces_frame = ttk.LabelFrame(
|
|
self.control_frame, text="Network Interfaces", padding="5"
|
|
)
|
|
self.interfaces_frame.grid(row=0, column=0, sticky="ew", pady=(0, 10))
|
|
self.interfaces_frame.columnconfigure(
|
|
1, weight=1
|
|
) # Make combo box column expandable
|
|
|
|
ttk.Label(self.interfaces_frame, text="Select Interface:").grid(
|
|
row=0, column=0, sticky="w", padx=5
|
|
)
|
|
self.if_var = tk.StringVar()
|
|
self.if_combo = ttk.Combobox(
|
|
self.interfaces_frame, textvariable=self.if_var, state="readonly", width=50
|
|
)
|
|
self.if_combo.grid(row=0, column=1, sticky="ew", padx=5)
|
|
self.if_combo.bind("<<ComboboxSelected>>", self.on_interface_selected)
|
|
|
|
# Info de configuración actual
|
|
self.current_frame = ttk.LabelFrame(
|
|
self.control_frame, text="Current Configuration", padding="5"
|
|
)
|
|
self.current_frame.grid(row=1, column=0, sticky="ew", pady=(0, 10))
|
|
self.current_frame.columnconfigure(1, weight=1) # Make value column expandable
|
|
|
|
self.current_ip_var = tk.StringVar()
|
|
self.current_mask_var = tk.StringVar()
|
|
self.current_gateway_var = tk.StringVar()
|
|
self.dhcp_status_var = tk.StringVar()
|
|
|
|
# Current IP and copy button - aligned left
|
|
ttk.Label(self.current_frame, text="Current IP:").grid(
|
|
row=0, column=0, sticky="w", padx=5
|
|
)
|
|
ip_frame = ttk.Frame(self.current_frame)
|
|
ip_frame.grid(row=0, column=1, sticky="w")
|
|
ttk.Label(ip_frame, textvariable=self.current_ip_var).pack(side=tk.LEFT)
|
|
ttk.Button(ip_frame, text="Copy", width=5, command=self.copy_current_ip).pack(
|
|
side=tk.LEFT, padx=5
|
|
)
|
|
|
|
ttk.Label(self.current_frame, text="Subnet Mask:").grid(
|
|
row=1, column=0, sticky="w", padx=5
|
|
)
|
|
ttk.Label(self.current_frame, textvariable=self.current_mask_var).grid(
|
|
row=1, column=1, sticky="w"
|
|
)
|
|
|
|
# Gateway and copy button - aligned left
|
|
ttk.Label(self.current_frame, text="Gateway:").grid(
|
|
row=2, column=0, sticky="w", padx=5
|
|
)
|
|
gateway_frame = ttk.Frame(self.current_frame)
|
|
gateway_frame.grid(row=2, column=1, sticky="w")
|
|
ttk.Label(gateway_frame, textvariable=self.current_gateway_var).pack(
|
|
side=tk.LEFT
|
|
)
|
|
ttk.Button(
|
|
gateway_frame, text="Copy", width=5, command=self.copy_current_gateway
|
|
).pack(side=tk.LEFT, padx=5)
|
|
|
|
ttk.Label(self.current_frame, text="DHCP Status:").grid(
|
|
row=3, column=0, sticky="w", padx=5
|
|
)
|
|
ttk.Label(self.current_frame, textvariable=self.dhcp_status_var).grid(
|
|
row=3, column=1, sticky="w"
|
|
)
|
|
|
|
# Sección de configuración IP
|
|
self.ip_frame = ttk.LabelFrame(
|
|
self.control_frame, text="IP Configuration", padding="5"
|
|
)
|
|
self.ip_frame.grid(row=2, column=0, sticky="ew", pady=(0, 10))
|
|
self.ip_frame.columnconfigure(1, weight=1) # Make IP entry column expandable
|
|
|
|
# IP Prefix row - all aligned left
|
|
ttk.Label(self.ip_frame, text="IP Prefix (first 3 octets):").grid(
|
|
row=0, column=0, sticky="w", padx=5
|
|
)
|
|
ip_prefix_frame = ttk.Frame(self.ip_frame)
|
|
ip_prefix_frame.grid(row=0, column=1, sticky="w", padx=5)
|
|
|
|
self.ip_prefix = tk.StringVar(value=self.config.last_ip_prefix)
|
|
self.ip_entry = ttk.Entry(
|
|
ip_prefix_frame, textvariable=self.ip_prefix, width=30
|
|
)
|
|
self.ip_entry.pack(side=tk.LEFT)
|
|
|
|
ttk.Label(ip_prefix_frame, text="Last Octet:").pack(side=tk.LEFT, padx=5)
|
|
self.last_octet = tk.StringVar(value="249")
|
|
self.last_octet_entry = ttk.Entry(
|
|
ip_prefix_frame, textvariable=self.last_octet, width=5
|
|
)
|
|
self.last_octet_entry.pack(side=tk.LEFT)
|
|
|
|
# Subnet mask row - all aligned left
|
|
ttk.Label(self.ip_frame, text="Subnet Mask:").grid(
|
|
row=1, column=0, sticky="w", padx=5
|
|
)
|
|
subnet_frame = ttk.Frame(self.ip_frame)
|
|
subnet_frame.grid(row=1, column=1, sticky="w", padx=5)
|
|
|
|
self.subnet_mask = tk.StringVar(value="255.255.255.0")
|
|
self.subnet_entry = ttk.Entry(
|
|
subnet_frame, textvariable=self.subnet_mask, width=15
|
|
)
|
|
self.subnet_entry.pack(side=tk.LEFT)
|
|
|
|
ttk.Label(subnet_frame, text="CIDR (/bits):").pack(side=tk.LEFT, padx=5)
|
|
self.cidr_bits = tk.StringVar(value="24")
|
|
self.cidr_entry = ttk.Entry(subnet_frame, textvariable=self.cidr_bits, width=5)
|
|
self.cidr_entry.pack(side=tk.LEFT)
|
|
|
|
# Sección de historial
|
|
self.history_frame = ttk.LabelFrame(
|
|
self.control_frame, text="IP History", padding="5"
|
|
)
|
|
self.history_frame.grid(row=3, column=0, sticky="ew", pady=(0, 10))
|
|
self.history_frame.columnconfigure(1, weight=1) # Make combo column expandable
|
|
|
|
ttk.Label(self.history_frame, text="Previous IPs:").grid(
|
|
row=0, column=0, sticky="w", padx=5
|
|
)
|
|
self.history_var = tk.StringVar()
|
|
self.history_combo = ttk.Combobox(
|
|
self.history_frame,
|
|
textvariable=self.history_var,
|
|
state="readonly",
|
|
width=50,
|
|
)
|
|
self.history_combo.grid(row=0, column=1, sticky="ew", padx=5)
|
|
self.history_combo.bind("<<ComboboxSelected>>", self.on_history_selected)
|
|
|
|
# Botones de acción
|
|
self.button_frame = ttk.Frame(self.control_frame)
|
|
self.button_frame.grid(row=4, column=0, sticky="ew", pady=(0, 10))
|
|
|
|
# Make the button frame distribute buttons evenly
|
|
for i in range(4): # For the 4 buttons
|
|
self.button_frame.columnconfigure(i, weight=1)
|
|
|
|
ttk.Button(
|
|
self.button_frame, text="Set Static IP", command=self.set_static_ip
|
|
).grid(row=0, column=0, padx=5)
|
|
ttk.Button(self.button_frame, text="Enable DHCP", command=self.set_dhcp).grid(
|
|
row=0, column=1, padx=5
|
|
)
|
|
ttk.Button(
|
|
self.button_frame, text="Restore Previous", command=self.restore_previous
|
|
).grid(row=0, column=2, padx=5)
|
|
ttk.Button(
|
|
self.button_frame,
|
|
text="Refresh Interfaces",
|
|
command=self.refresh_interfaces,
|
|
).grid(row=0, column=3, padx=5)
|
|
|
|
def create_tools_tab(self):
|
|
# Control frame for Tools tab
|
|
self.tools_control_frame = ttk.Frame(self.tools_tab)
|
|
self.tools_control_frame.grid(row=0, column=0, sticky="nsew")
|
|
self.tools_control_frame.columnconfigure(0, weight=1)
|
|
self.tools_control_frame.rowconfigure(
|
|
1, weight=1
|
|
) # Make the scan frame expandable
|
|
|
|
# Network Tools section (ping tool)
|
|
self.ping_frame = ttk.LabelFrame(
|
|
self.tools_control_frame, text="Ping Tool", padding="5"
|
|
)
|
|
self.ping_frame.grid(row=0, column=0, sticky="ew", pady=(0, 10))
|
|
self.ping_frame.columnconfigure(
|
|
1, weight=1
|
|
) # Make ping target field expandable
|
|
|
|
ttk.Label(self.ping_frame, text="Target IP/Host:").grid(
|
|
row=0, column=0, sticky="w", padx=5
|
|
)
|
|
self.ping_target = tk.StringVar()
|
|
self.ping_entry = ttk.Entry(
|
|
self.ping_frame, textvariable=self.ping_target, width=40
|
|
)
|
|
self.ping_entry.grid(row=0, column=1, sticky="w", padx=5)
|
|
# Bind the FocusOut event to save the ping target
|
|
self.ping_entry.bind("<FocusOut>", self.save_current_ping_target)
|
|
|
|
# Add last ping time display field
|
|
ttk.Label(self.ping_frame, text="Last RTT:").grid(
|
|
row=0, column=2, sticky="w", padx=5
|
|
)
|
|
self.last_ping_time = tk.StringVar(value="N/A")
|
|
ping_time_display = ttk.Entry(
|
|
self.ping_frame, textvariable=self.last_ping_time, width=8, state="readonly"
|
|
)
|
|
ping_time_display.grid(row=0, column=3, padx=5)
|
|
|
|
# Add continuous ping checkbox
|
|
self.continuous_ping_check = ttk.Checkbutton(
|
|
self.ping_frame, text="Continuous Ping", variable=self.continuous_ping
|
|
)
|
|
self.continuous_ping_check.grid(row=0, column=4, padx=5)
|
|
|
|
ttk.Button(self.ping_frame, text="Ping", command=self.do_ping).grid(
|
|
row=0, column=5, padx=5
|
|
)
|
|
ttk.Button(self.ping_frame, text="Stop", command=self.stop_ping).grid(
|
|
row=0, column=6, padx=5
|
|
)
|
|
|
|
# Network Scan section - make it expand both horizontally and vertically
|
|
self.scan_frame = ttk.LabelFrame(
|
|
self.tools_control_frame, text="Network Scan", padding="5"
|
|
)
|
|
self.scan_frame.grid(row=1, column=0, sticky="nsew", pady=(0, 10))
|
|
|
|
# Configure column weights for scan frame to make it expandable
|
|
self.scan_frame.columnconfigure(0, weight=1) # Single column for frames
|
|
self.scan_frame.rowconfigure(4, weight=1) # Make results row expandable
|
|
|
|
# Use frames to organize elements in a row and aligned left
|
|
# Start IP and End IP row
|
|
ip_range_frame = ttk.Frame(self.scan_frame)
|
|
ip_range_frame.grid(row=0, column=0, sticky="w", padx=5, pady=2)
|
|
|
|
ttk.Label(ip_range_frame, text="Start IP:").pack(side=tk.LEFT, padx=(0, 5))
|
|
self.scan_start_ip = tk.StringVar()
|
|
self.scan_start_entry = ttk.Entry(
|
|
ip_range_frame, textvariable=self.scan_start_ip, width=15
|
|
)
|
|
self.scan_start_entry.pack(side=tk.LEFT)
|
|
|
|
ttk.Label(ip_range_frame, text="End IP:").pack(side=tk.LEFT, padx=(10, 5))
|
|
self.scan_end_ip = tk.StringVar()
|
|
self.scan_end_entry = ttk.Entry(
|
|
ip_range_frame, textvariable=self.scan_end_ip, width=15
|
|
)
|
|
self.scan_end_entry.pack(side=tk.LEFT)
|
|
|
|
ttk.Label(ip_range_frame, text="CIDR (/bits):").pack(side=tk.LEFT, padx=(10, 5))
|
|
self.scan_cidr_bits = tk.StringVar(value="24")
|
|
self.scan_cidr_entry = ttk.Entry(
|
|
ip_range_frame, textvariable=self.scan_cidr_bits, width=5
|
|
)
|
|
self.scan_cidr_entry.pack(side=tk.LEFT)
|
|
# Add trace to update the scan range when CIDR changes
|
|
self.scan_cidr_bits.trace("w", self.update_scan_range_from_cidr)
|
|
|
|
# Nodes to scan and buttons row
|
|
controls_frame = ttk.Frame(self.scan_frame)
|
|
controls_frame.grid(row=1, column=0, sticky="ew", padx=5, pady=2)
|
|
controls_frame.columnconfigure(
|
|
1, weight=1
|
|
) # Make space between nodes and buttons expandable
|
|
|
|
nodes_frame = ttk.Frame(controls_frame)
|
|
nodes_frame.grid(row=0, column=0, sticky="w")
|
|
ttk.Label(nodes_frame, text="Nodes to scan:").pack(side=tk.LEFT, padx=(0, 5))
|
|
self.nodes_to_scan = tk.StringVar(value="0")
|
|
nodes_display = ttk.Entry(
|
|
nodes_frame, textvariable=self.nodes_to_scan, width=10, state="readonly"
|
|
)
|
|
nodes_display.pack(side=tk.LEFT)
|
|
|
|
# Scan buttons - aligned right
|
|
self.scan_buttons_frame = ttk.Frame(controls_frame)
|
|
self.scan_buttons_frame.grid(row=0, column=1, sticky="e")
|
|
|
|
ttk.Button(
|
|
self.scan_buttons_frame, text="Start Scan", command=self.start_scan
|
|
).pack(side=tk.LEFT, padx=5)
|
|
ttk.Button(
|
|
self.scan_buttons_frame, text="Stop Scan", command=self.stop_scan
|
|
).pack(side=tk.LEFT, padx=5)
|
|
ttk.Button(
|
|
self.scan_buttons_frame,
|
|
text="Get Host Info",
|
|
command=self.gather_host_information,
|
|
).pack(side=tk.LEFT, padx=5)
|
|
|
|
# Scan progress - keep full width and make it expand horizontally
|
|
self.scan_progress = ttk.Progressbar(
|
|
self.scan_frame,
|
|
orient="horizontal",
|
|
length=300,
|
|
mode="determinate",
|
|
variable=self.scan_progress_var,
|
|
)
|
|
self.scan_progress.grid(row=2, column=0, sticky="ew", padx=5, pady=5)
|
|
|
|
# Scan results - keep full width
|
|
ttk.Label(self.scan_frame, text="Scan Results:").grid(
|
|
row=3, column=0, sticky="w", padx=5
|
|
)
|
|
|
|
# Frame for results list and scrollbar - make it fully expandable
|
|
self.results_frame = ttk.Frame(self.scan_frame)
|
|
self.results_frame.grid(row=4, column=0, sticky="nsew", padx=5, pady=5)
|
|
self.results_frame.columnconfigure(0, weight=1)
|
|
self.results_frame.rowconfigure(0, weight=1)
|
|
|
|
# Replace Listbox with Treeview that has columns - make sure it expands
|
|
self.scan_results_tree = ttk.Treeview(
|
|
self.results_frame,
|
|
columns=("ip", "delay", "hostname", "mac", "vendor"), # Added delay column
|
|
show="headings",
|
|
height=10,
|
|
)
|
|
|
|
# Define columns
|
|
self.scan_results_tree.heading("ip", text="IP Address")
|
|
self.scan_results_tree.heading(
|
|
"delay", text="Ping (ms)"
|
|
) # New column for ping delay
|
|
self.scan_results_tree.heading("hostname", text="Hostname")
|
|
self.scan_results_tree.heading("mac", text="MAC Address")
|
|
self.scan_results_tree.heading("vendor", text="MAC Vendor")
|
|
|
|
# Set column widths with adjusted proportions
|
|
total_width = 650 # Base total width for the columns
|
|
self.scan_results_tree.column(
|
|
"ip", width=int(total_width * 0.15), anchor="w", stretch=True
|
|
)
|
|
self.scan_results_tree.column(
|
|
"delay", width=int(total_width * 0.08), anchor="center", stretch=True
|
|
)
|
|
self.scan_results_tree.column(
|
|
"hostname", width=int(total_width * 0.27), anchor="w", stretch=True
|
|
)
|
|
self.scan_results_tree.column(
|
|
"mac", width=int(total_width * 0.20), anchor="w", stretch=True
|
|
)
|
|
self.scan_results_tree.column(
|
|
"vendor", width=int(total_width * 0.30), anchor="w", stretch=True
|
|
)
|
|
|
|
# Add scrollbar
|
|
self.scan_results_scrollbar = ttk.Scrollbar(
|
|
self.results_frame, orient="vertical", command=self.scan_results_tree.yview
|
|
)
|
|
self.scan_results_tree.configure(yscrollcommand=self.scan_results_scrollbar.set)
|
|
|
|
# Make sure the treeview fills the available space
|
|
self.scan_results_tree.grid(row=0, column=0, sticky="nsew")
|
|
self.scan_results_scrollbar.grid(row=0, column=1, sticky="ns")
|
|
|
|
def clear_log(self):
|
|
self.log_text.delete("1.0", tk.END)
|
|
|
|
def log_message(self, message: str, tag: str = None):
|
|
"""Enhanced log function with tags for color and formatting"""
|
|
# Try to automatically detect message type if tag not specified
|
|
if tag is None:
|
|
if "ERROR:" in message or "Error" in message or "failed" in message.lower():
|
|
tag = "ERROR"
|
|
elif (
|
|
"SUCCESS" in message
|
|
or "Successfully" in message
|
|
or "completed" in message
|
|
):
|
|
tag = "SUCCESS"
|
|
elif "WARNING" in message or "Warn" in message:
|
|
tag = "WARN"
|
|
elif "Host discovered:" in message:
|
|
tag = "DISCOVERY"
|
|
elif (
|
|
"Reply from" in message
|
|
or "ping" in message.lower()
|
|
or "Request timed out" in message
|
|
):
|
|
tag = "PING"
|
|
elif "Hostname:" in message:
|
|
tag = "HOSTNAME"
|
|
elif "MAC Address:" in message:
|
|
tag = "MAC"
|
|
elif "MAC Vendor:" in message or "Vendor:" in message:
|
|
tag = "VENDOR"
|
|
elif "Executing" in message and "command" in message.lower():
|
|
tag = "COMMAND"
|
|
elif "INFO:" in message:
|
|
tag = "INFO"
|
|
else:
|
|
tag = None
|
|
|
|
# Insert the timestamp and message
|
|
timestamp = time.strftime("%H:%M:%S", time.localtime())
|
|
self.log_text.insert(tk.END, f"[{timestamp}] ", "timestamp")
|
|
|
|
# Insert the message with appropriate tag
|
|
if tag:
|
|
self.log_text.insert(tk.END, f"{message}\n", tag)
|
|
else:
|
|
self.log_text.insert(tk.END, f"{message}\n")
|
|
|
|
# Scroll to see the latest message
|
|
self.log_text.see(tk.END)
|
|
self.log_text.update_idletasks()
|
|
|
|
def load_ip_history(self):
|
|
try:
|
|
if os.path.exists(self.config.history_file):
|
|
with open(self.config.history_file, "r") as f:
|
|
self.ip_history = json.load(f)
|
|
else:
|
|
self.ip_history = []
|
|
except Exception as e:
|
|
self.log_message(f"Error loading IP history: {e}")
|
|
self.ip_history = []
|
|
|
|
def save_ip_history(self):
|
|
try:
|
|
with open(self.config.history_file, "w") as f:
|
|
json.dump(self.ip_history, f)
|
|
except Exception as e:
|
|
self.log_message(f"Error saving IP history: {e}")
|
|
|
|
def add_to_history(self, ip_prefix: str):
|
|
if ip_prefix and ip_prefix not in self.ip_history:
|
|
self.ip_history.insert(0, ip_prefix)
|
|
self.ip_history = self.ip_history[:15] # Mantener solo las últimas 15 IPs
|
|
self.save_ip_history()
|
|
self.update_history_display()
|
|
|
|
def update_history_display(self):
|
|
self.history_combo["values"] = self.ip_history
|
|
if self.ip_history:
|
|
self.history_combo.set(self.ip_history[0])
|
|
|
|
def on_history_selected(self, event):
|
|
selected_ip = self.history_var.get()
|
|
if selected_ip:
|
|
# Ensure we only get the first three octets
|
|
ip_parts = selected_ip.split(".")
|
|
if len(ip_parts) >= 3:
|
|
ip_prefix = ".".join(ip_parts[:3])
|
|
self.ip_prefix.set(ip_prefix)
|
|
self.config.last_ip_prefix = ip_prefix
|
|
self.config.save_config()
|
|
self.log_message(f"Selected IP prefix from history: {ip_prefix}")
|
|
|
|
# Update ping target for the new IP prefix
|
|
self.update_ping_target(ip_prefix)
|
|
|
|
def refresh_interfaces(self):
|
|
self.interfaces.clear()
|
|
try:
|
|
if WMI_AVAILABLE:
|
|
self._refresh_interfaces_wmi()
|
|
else:
|
|
self._refresh_interfaces_netsh()
|
|
|
|
# Actualizar combobox
|
|
self.if_combo["values"] = [
|
|
inf.name for inf in self.interfaces if inf.active
|
|
]
|
|
|
|
# Seleccionar última interfaz usada si está disponible
|
|
if self.config.last_interface in self.if_combo["values"]:
|
|
self.if_combo.set(self.config.last_interface)
|
|
self.on_interface_selected()
|
|
|
|
self.log_message("Interfaces refreshed successfully")
|
|
|
|
except Exception as e:
|
|
self.log_message(f"Error refreshing interfaces: {str(e)}")
|
|
self.show_error(f"Error refreshing interfaces: {str(e)}")
|
|
|
|
def _refresh_interfaces_wmi(self):
|
|
try:
|
|
adapters = self.wmi_client.Win32_NetworkAdapter(PhysicalAdapter=True)
|
|
configs = self.wmi_client.Win32_NetworkAdapterConfiguration()
|
|
|
|
for adapter in adapters:
|
|
config = next(
|
|
(
|
|
cfg
|
|
for cfg in configs
|
|
if cfg.InterfaceIndex == adapter.InterfaceIndex
|
|
),
|
|
None,
|
|
)
|
|
|
|
if config and config.IPEnabled:
|
|
interface = NetworkInterface(
|
|
name=adapter.NetConnectionID,
|
|
description=adapter.Description,
|
|
ip_address=config.IPAddress[0] if config.IPAddress else "",
|
|
subnet_mask=config.IPSubnet[0] if config.IPSubnet else "",
|
|
gateway=(
|
|
config.DefaultIPGateway[0]
|
|
if config.DefaultIPGateway
|
|
else ""
|
|
),
|
|
dhcp_enabled=config.DHCPEnabled,
|
|
active=adapter.NetEnabled,
|
|
adapter_id=adapter.GUID,
|
|
)
|
|
self.interfaces.append(interface)
|
|
except Exception as e:
|
|
raise Exception(f"WMI refresh error: {str(e)}")
|
|
|
|
def _refresh_interfaces_netsh(self):
|
|
try:
|
|
output = subprocess.check_output(
|
|
"netsh interface ipv4 show config", shell=True, text=True
|
|
)
|
|
configs = output.split("\n\n")
|
|
|
|
for config in configs:
|
|
if not config.strip():
|
|
continue
|
|
|
|
name_match = re.search(r"Configuración de\s+\"(.+?)\"", config)
|
|
if not name_match:
|
|
continue
|
|
|
|
name = name_match.group(1)
|
|
|
|
ip_match = re.search(r"Dirección IP:\s+(\d+\.\d+\.\d+\.\d+)", config)
|
|
mask_match = re.search(
|
|
r"Máscara de subred:\s+(\d+\.\d+\.\d+\.\d+)", config
|
|
)
|
|
gateway_match = re.search(
|
|
r"Puerta de enlace predeterminada:\s+(\d+\.\d+\.\d+\.\d+)", config
|
|
)
|
|
dhcp_match = re.search(r"DHCP habilitado:\s+(\w+)", config)
|
|
|
|
interface = NetworkInterface(
|
|
name=name,
|
|
description=name,
|
|
ip_address=ip_match.group(1) if ip_match else "",
|
|
subnet_mask=mask_match.group(1) if mask_match else "",
|
|
gateway=gateway_match.group(1) if gateway_match else "",
|
|
dhcp_enabled=(
|
|
dhcp_match.group(1).lower() == "sí" if dhcp_match else True
|
|
),
|
|
active=True,
|
|
)
|
|
self.interfaces.append(interface)
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise Exception(f"Error executing netsh: {str(e)}")
|
|
except Exception as e:
|
|
raise Exception(f"Error parsing netsh output: {str(e)}")
|
|
|
|
def on_interface_selected(self, event=None):
|
|
selected = self.if_var.get()
|
|
interface = next((inf for inf in self.interfaces if inf.name == selected), None)
|
|
|
|
if interface:
|
|
self.current_ip_var.set(interface.ip_address)
|
|
self.current_mask_var.set(interface.subnet_mask)
|
|
self.current_gateway_var.set(interface.gateway)
|
|
self.dhcp_status_var.set(
|
|
"Enabled" if interface.dhcp_enabled else "Disabled"
|
|
)
|
|
|
|
# Actualizar IP prefix si hay una IP válida
|
|
if interface.ip_address:
|
|
prefix = ".".join(interface.ip_address.split(".")[:3])
|
|
self.ip_prefix.set(prefix)
|
|
|
|
# Update ping target for the new IP prefix
|
|
self.update_ping_target(prefix)
|
|
|
|
# Guardar interfaz seleccionada
|
|
self.config.last_interface = selected
|
|
self.config.save_config()
|
|
|
|
self.log_message(f"Selected interface: {selected}")
|
|
|
|
# Update scan IP range based on current IP and subnet mask
|
|
if interface.ip_address and interface.subnet_mask:
|
|
self.update_scan_ip_range(interface.ip_address, interface.subnet_mask)
|
|
|
|
def validate_ip_input(self) -> tuple[bool, str]:
|
|
try:
|
|
prefix = self.ip_prefix.get().strip()
|
|
last_octet = self.last_octet.get().strip()
|
|
subnet_mask = self.subnet_mask.get().strip()
|
|
|
|
# Validar formato del prefijo
|
|
if not re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}", prefix):
|
|
return False, "IP prefix must be in format: xxx.xxx.xxx"
|
|
|
|
# Validar último octeto
|
|
if not last_octet.isdigit() or not 0 <= int(last_octet) <= 255:
|
|
return False, "Last octet must be between 0 and 255"
|
|
|
|
# Validar máscara de subred
|
|
if not self.is_valid_subnet_mask(subnet_mask):
|
|
return False, "Invalid subnet mask format"
|
|
|
|
# Construir y validar IP completa
|
|
ip = f"{prefix}.{last_octet}"
|
|
ipaddress.IPv4Address(ip)
|
|
|
|
return True, ip
|
|
|
|
except ValueError as e:
|
|
return False, f"Invalid IP address: {str(e)}"
|
|
|
|
def execute_admin_script(
|
|
self,
|
|
interface: str,
|
|
mode: str,
|
|
debug: bool = True,
|
|
subnet_mask: str = "255.255.255.0",
|
|
) -> bool:
|
|
"""
|
|
Ejecuta el script administrativo con los argumentos proporcionados
|
|
mode puede ser una IP o 'dhcp'
|
|
"""
|
|
# Obtener rutas absolutas
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
script_path = os.path.join(current_dir, "ip-changer-admin.py")
|
|
python_exe = sys.executable
|
|
|
|
# Verificar que el script existe
|
|
if not os.path.exists(script_path):
|
|
error_msg = f"Admin script not found at: {script_path}"
|
|
self.log_message(error_msg)
|
|
return False
|
|
|
|
# Construir la línea de argumentos - pass both IP and subnet as a single parameter
|
|
if mode != "dhcp":
|
|
# For static IP, create the netsh command with subnet mask
|
|
gateway = (
|
|
f"{self.get_ip_prefix(mode)}.1" # Use the first three octets of the IP
|
|
)
|
|
netsh_cmd = f'netsh interface ip set address "{interface}" static {mode} {subnet_mask} {gateway}'
|
|
args = f'"{script_path}" "{interface}" "{netsh_cmd}"'
|
|
else:
|
|
# For DHCP, keep it simple
|
|
args = f'"{script_path}" "{interface}" "dhcp"'
|
|
|
|
if debug:
|
|
self.log_message("Debug Information:")
|
|
self.log_message(f"Current Directory: {current_dir}")
|
|
self.log_message(f"Script Path: {script_path}")
|
|
self.log_message(f"Python Executable: {python_exe}")
|
|
self.log_message(f"Full Command: {python_exe} {args}")
|
|
|
|
try:
|
|
self.log_message(
|
|
f"Attempting to execute admin script with elevated privileges"
|
|
)
|
|
|
|
ret = ctypes.windll.shell32.ShellExecuteW(
|
|
None, # hwnd
|
|
"runas", # operation
|
|
python_exe, # file
|
|
args, # parameters
|
|
current_dir, # directory
|
|
1, # show command
|
|
)
|
|
|
|
result_code = int(ret)
|
|
if result_code > 32:
|
|
self.log_message(
|
|
f"ShellExecuteW successful (return code: {result_code})"
|
|
)
|
|
return True
|
|
else:
|
|
error_codes = {
|
|
0: "The operating system is out of memory or resources",
|
|
2: "The specified file was not found",
|
|
3: "The specified path was not found",
|
|
5: "Access denied",
|
|
8: "Insufficient memory to complete the operation",
|
|
11: "Bad format",
|
|
26: "A sharing violation occurred",
|
|
27: "The file name association is incomplete or invalid",
|
|
28: "The DDE operation timed out",
|
|
29: "The DDE operation failed",
|
|
30: "The DDE operation is busy",
|
|
31: "The file name association is unavailable",
|
|
32: "No application is associated with the file",
|
|
}
|
|
|
|
self.log_message(
|
|
f"Failed to execute admin script. Error code {result_code}: {error_msg}"
|
|
)
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.log_message(f"Exception while executing admin script: {str(e)}")
|
|
return False
|
|
|
|
def save_previous_config(self, interface_name: str):
|
|
"""Guarda la configuración actual de la interfaz antes de cambiarla"""
|
|
interface = next(
|
|
(inf for inf in self.interfaces if inf.name == interface_name), None
|
|
)
|
|
if interface:
|
|
self.config.previous_config = {
|
|
"name": interface.name,
|
|
"ip_address": interface.ip_address,
|
|
"subnet_mask": interface.subnet_mask,
|
|
"gateway": interface.gateway,
|
|
"dhcp_enabled": interface.dhcp_enabled,
|
|
}
|
|
self.config.save_config()
|
|
self.log_message(f"Previous configuration saved for {interface.name}")
|
|
|
|
def restore_previous(self):
|
|
"""Restaura la configuración IP anterior"""
|
|
prev_config = self.config.previous_config
|
|
if not prev_config:
|
|
self.show_error("No previous configuration available")
|
|
return
|
|
|
|
interface_name = prev_config.get("name")
|
|
if not interface_name:
|
|
self.show_error("Invalid previous configuration")
|
|
return
|
|
|
|
# Verificar que la interfaz existe
|
|
if interface_name not in [inf.name for inf in self.interfaces]:
|
|
self.show_error(f"Interface {interface_name} not found")
|
|
return
|
|
|
|
self.log_message(f"Restoring previous configuration for {interface_name}")
|
|
|
|
if prev_config.get("dhcp_enabled", True):
|
|
# Si la configuración anterior era DHCP
|
|
if self.execute_admin_script(interface_name, "dhcp"):
|
|
self.log_message(f"Successfully restored DHCP on {interface_name}")
|
|
time.sleep(2)
|
|
self.refresh_interfaces()
|
|
else:
|
|
self.show_error("Failed to restore DHCP configuration")
|
|
else:
|
|
# Si la configuración anterior era IP estática
|
|
ip = prev_config.get("ip_address")
|
|
if not ip:
|
|
self.show_error("Previous IP address not available")
|
|
return
|
|
|
|
if self.execute_admin_script(interface_name, ip):
|
|
self.log_message(f"Successfully restored IP {ip} on {interface_name}")
|
|
time.sleep(2)
|
|
self.refresh_interfaces()
|
|
else:
|
|
self.show_error("Failed to restore static IP configuration")
|
|
|
|
def do_ping(self):
|
|
"""Realiza un ping a la dirección especificada usando una biblioteca de Python"""
|
|
target = self.ping_target.get().strip()
|
|
if not target:
|
|
self.show_error("Please enter a target IP or hostname")
|
|
return
|
|
|
|
# Save the ping target when used
|
|
|
|
self.save_current_ping_target()
|
|
|
|
# Check if continuous ping is enabled
|
|
if self.continuous_ping.get():
|
|
self.log_message(f"Starting continuous ping to {target}...")
|
|
|
|
# Reset the stop event
|
|
self.ping_stop_event.clear()
|
|
self.ping_running = True
|
|
|
|
# Start ping in a separate thread
|
|
threading.Thread(
|
|
target=self._execute_continuous_ping, args=(target,), daemon=True
|
|
).start()
|
|
else:
|
|
self.log_message(f"Pinging {target}...")
|
|
|
|
# Standard ping (original behavior)
|
|
threading.Thread(
|
|
target=self._execute_ping, args=(target,), daemon=True
|
|
).start()
|
|
|
|
def stop_ping(self):
|
|
"""Stops any ongoing continuous ping"""
|
|
if self.ping_running:
|
|
self.log_message("Stopping continuous ping...")
|
|
self.ping_stop_event.set()
|
|
self.ping_running = False
|
|
self.last_ping_time.set("N/A") # Reset when stopped
|
|
|
|
def _execute_continuous_ping(self, target: str):
|
|
"""Execute continuous ping until stopped"""
|
|
try:
|
|
self.log_message(
|
|
f"Continuous ping to {target} started. Press Stop to terminate."
|
|
)
|
|
count = 0
|
|
|
|
while not self.ping_stop_event.is_set():
|
|
count += 1
|
|
if PING_LIBRARY_AVAILABLE:
|
|
try:
|
|
# Send a single ping
|
|
response = pythonping(target, count=1, timeout=1)
|
|
|
|
# Check if any reply was successful
|
|
if any(reply.success for reply in response):
|
|
# Get the first successful reply
|
|
for reply in response:
|
|
if reply.success:
|
|
rtt = reply.time_elapsed_ms
|
|
# Update the last ping time field
|
|
self.last_ping_time.set(f"{rtt:.1f} ms")
|
|
self.log_message(
|
|
f"Reply from {target}: time={rtt:.2f}ms (seq={count})"
|
|
)
|
|
break
|
|
else:
|
|
self.last_ping_time.set("Timeout")
|
|
self.log_message(f"Request timed out (seq={count})")
|
|
except Exception as ping_error:
|
|
self.last_ping_time.set("Error")
|
|
self.log_message(f"Ping error: {str(ping_error)} (seq={count})")
|
|
else:
|
|
# Fallback to socket
|
|
try:
|
|
start_time = time.time()
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(1)
|
|
result = s.connect_ex((target, 80))
|
|
elapsed_time = (time.time() - start_time) * 1000 # ms
|
|
s.close()
|
|
|
|
if result == 0:
|
|
# Update the last ping time field
|
|
self.last_ping_time.set(f"{elapsed_time:.1f} ms")
|
|
self.log_message(
|
|
f"Connected to {target}:80 in {elapsed_time:.2f}ms (seq={count})"
|
|
)
|
|
else:
|
|
self.last_ping_time.set("Failed")
|
|
self.log_message(
|
|
f"Failed to connect to {target}:80 (seq={count})"
|
|
)
|
|
except Exception as socket_error:
|
|
self.last_ping_time.set("Error")
|
|
self.log_message(
|
|
f"Connection error: {str(socket_error)} (seq={count})"
|
|
)
|
|
|
|
# Wait 1 second before next ping
|
|
for _ in range(
|
|
10
|
|
): # Check for stop every 100ms for more responsive stopping
|
|
if self.ping_stop_event.is_set():
|
|
break
|
|
time.sleep(0.1)
|
|
|
|
self.log_message(
|
|
f"Continuous ping to {target} stopped after {count} pings."
|
|
)
|
|
|
|
except Exception as e:
|
|
self.log_message(f"Error in continuous ping: {str(e)}")
|
|
finally:
|
|
self.ping_running = False
|
|
self.last_ping_time.set("N/A") # Reset on stop
|
|
|
|
def _execute_ping(self, target: str):
|
|
"""Execute a single ping and display the results"""
|
|
try:
|
|
# Reset the last ping time to "Testing..."
|
|
self.last_ping_time.set("Testing...")
|
|
|
|
self.log_message(f"Pinging {target} with 4 echo requests...", "PING")
|
|
|
|
if PING_LIBRARY_AVAILABLE:
|
|
# Use PythonPing library
|
|
response = pythonping(target, count=4, timeout=1)
|
|
|
|
# Display individual replies with PING tag
|
|
total_rtt = 0
|
|
successful_pings = 0
|
|
|
|
for i, reply in enumerate(response):
|
|
if reply.success:
|
|
rtt = reply.time_elapsed_ms
|
|
total_rtt += rtt
|
|
successful_pings += 1
|
|
self.log_message(
|
|
f"Reply from {target}: time={rtt:.2f}ms", "PING"
|
|
)
|
|
# Update the last ping time with the most recent successful ping
|
|
self.last_ping_time.set(f"{rtt:.1f} ms")
|
|
else:
|
|
self.log_message(f"Request timed out (seq={i+1})", "PING")
|
|
|
|
# If all pings failed, update the last ping time appropriately
|
|
if successful_pings == 0:
|
|
self.last_ping_time.set("Timeout")
|
|
|
|
# Display statistics
|
|
success_count = sum(1 for r in response if r.success)
|
|
loss_percentage = (4 - success_count) * 25
|
|
|
|
self.log_message(f"\nPing statistics for {target}:")
|
|
self.log_message(
|
|
f" Packets: Sent = 4, Received = {success_count}, Lost = {4 - success_count} ({loss_percentage}% loss)"
|
|
)
|
|
|
|
if success_count > 0:
|
|
min_rtt = min(r.time_elapsed_ms for r in response if r.success)
|
|
max_rtt = max(r.time_elapsed_ms for r in response if r.success)
|
|
avg_rtt = (
|
|
sum(r.time_elapsed_ms for r in response if r.success)
|
|
/ success_count
|
|
)
|
|
|
|
self.log_message(f"Approximate round trip times in milliseconds:")
|
|
self.log_message(
|
|
f" Minimum = {min_rtt:.2f}ms, Maximum = {max_rtt:.2f}ms, Average = {avg_rtt:.2f}ms"
|
|
)
|
|
else:
|
|
# Fallback to socket if PythonPing is not available
|
|
self.log_message("Using socket connection test (not a true ICMP ping)")
|
|
|
|
success_count = 0
|
|
times = []
|
|
|
|
for i in range(4):
|
|
try:
|
|
start_time = time.time()
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(1)
|
|
result = s.connect_ex((target, 80)) # Try to connect to port 80
|
|
elapsed_time = (
|
|
time.time() - start_time
|
|
) * 1000 # Convert to ms
|
|
s.close()
|
|
|
|
if result == 0:
|
|
success_count += 1
|
|
times.append(elapsed_time)
|
|
self.log_message(
|
|
f"Connected to {target}:80 in {elapsed_time:.2f}ms"
|
|
)
|
|
else:
|
|
self.log_message(
|
|
f"Failed to connect to {target}:80 (seq={i+1})"
|
|
)
|
|
except Exception as e:
|
|
self.log_message(f"Connection error: {str(e)} (seq={i+1})")
|
|
|
|
# Display statistics
|
|
loss_percentage = (4 - success_count) * 25
|
|
self.log_message(f"\nConnection statistics for {target}:")
|
|
self.log_message(
|
|
f" Attempts: Sent = 4, Successful = {success_count}, Failed = {4 - success_count} ({loss_percentage}% loss)"
|
|
)
|
|
|
|
if success_count > 0:
|
|
self.log_message(f"Approximate connection times in milliseconds:")
|
|
self.log_message(
|
|
f" Minimum = {min(times):.2f}ms, Maximum = {max(times):.2f}ms, Average = {sum(times)/len(times):.2f}ms"
|
|
)
|
|
|
|
# Try to get hostname and MAC address
|
|
hostname = self.get_hostname(target)
|
|
mac_address = self.get_mac_address(target)
|
|
vendor = ""
|
|
|
|
# Look up vendor if we have a MAC
|
|
if mac_address and MAC_LOOKUP_AVAILABLE:
|
|
vendor = self.get_mac_vendor(mac_address)
|
|
|
|
if hostname:
|
|
self.log_message(f"Hostname: {hostname}", "HOSTNAME")
|
|
if mac_address:
|
|
self.log_message(f"MAC Address: {mac_address}", "MAC")
|
|
if vendor:
|
|
self.log_message(f"MAC Vendor: {vendor}", "VENDOR")
|
|
|
|
except Exception as e:
|
|
self.last_ping_time.set("Error")
|
|
self.log_message(f"Error executing ping: {str(e)}", "ERROR")
|
|
|
|
def start_scan(self):
|
|
"""Start network scanning between IP range"""
|
|
start_ip = self.scan_start_ip.get().strip()
|
|
end_ip = self.scan_end_ip.get().strip()
|
|
|
|
# Validate IPs
|
|
try:
|
|
start = ipaddress.IPv4Address(start_ip)
|
|
end = ipaddress.IPv4Address(end_ip)
|
|
|
|
if start > end:
|
|
self.show_error("Start IP must be lower than or equal to End IP")
|
|
return
|
|
|
|
# Clear previous results
|
|
for item in self.scan_results_tree.get_children():
|
|
self.scan_results_tree.delete(item)
|
|
|
|
self.scan_results = []
|
|
|
|
# Calculate total IPs to scan for progress bar
|
|
ip_range = int(end) - int(start) + 1
|
|
self.scan_progress_var.set(0)
|
|
self.scan_progress["maximum"] = ip_range
|
|
|
|
# Update nodes to scan count
|
|
self.nodes_to_scan.set(str(ip_range))
|
|
|
|
self.log_message(
|
|
f"Starting scan from {start_ip} to {end_ip} ({ip_range} addresses)..."
|
|
)
|
|
|
|
# Reset stop event and set running flag
|
|
self.scan_stop_event.clear()
|
|
self.scan_running = True
|
|
|
|
# Start scan in a separate thread
|
|
threading.Thread(
|
|
target=self._execute_scan, args=(start, end, ip_range), daemon=True
|
|
).start()
|
|
|
|
except ValueError as e:
|
|
self.show_error(f"Invalid IP address: {str(e)}")
|
|
|
|
def stop_scan(self):
|
|
"""Stop an ongoing network scan"""
|
|
if self.scan_running:
|
|
self.log_message("Stopping network scan...")
|
|
self.scan_stop_event.set()
|
|
|
|
# Wait for all scan threads to finish
|
|
for thread in self.scan_threads:
|
|
if thread.is_alive():
|
|
thread.join(0.5)
|
|
|
|
self.scan_running = False
|
|
self.log_message("Network scan stopped.")
|
|
|
|
def _execute_scan(self, start_ip, end_ip, ip_range):
|
|
"""Execute the network scan - now only performs ping scan"""
|
|
try:
|
|
# Empty the queue
|
|
while not self.scan_queue.empty():
|
|
self.scan_queue.get_nowait()
|
|
|
|
# Clear threads list
|
|
self.scan_threads = []
|
|
|
|
# Reset counter for finished IPs
|
|
self.scan_completed = 0
|
|
|
|
# Clear previous scan results tree
|
|
for item in self.scan_results_tree.get_children():
|
|
self.scan_results_tree.delete(item)
|
|
|
|
# Fill queue with all IPs in range
|
|
for ip_int in range(int(start_ip), int(end_ip) + 1):
|
|
self.scan_queue.put(ipaddress.IPv4Address(ip_int))
|
|
|
|
# Start worker threads for parallel scanning
|
|
max_threads = min(
|
|
20, ip_range
|
|
) # Limit to 20 threads or fewer if range is small
|
|
for i in range(max_threads):
|
|
thread = threading.Thread(target=self._scan_worker, daemon=True)
|
|
thread.start()
|
|
self.scan_threads.append(thread)
|
|
|
|
# Wait for threads to complete or be stopped
|
|
while self.scan_running and any(t.is_alive() for t in self.scan_threads):
|
|
time.sleep(0.1)
|
|
|
|
if not self.scan_stop_event.is_set():
|
|
self.log_message(
|
|
f"Scan completed. Found {len(self.scan_results)} active hosts. Use 'Get Host Info' to lookup hostnames and MAC addresses."
|
|
)
|
|
|
|
except Exception as e:
|
|
self.log_message(f"Error in network scan: {str(e)}")
|
|
finally:
|
|
self.scan_running = False
|
|
|
|
def _scan_worker(self):
|
|
"""Worker thread function to scan IPs from the queue - now with ping time recording"""
|
|
while not self.scan_stop_event.is_set():
|
|
try:
|
|
# Get next IP from queue with timeout
|
|
try:
|
|
ip_address = self.scan_queue.get(timeout=0.1)
|
|
except queue.Empty:
|
|
# No more IPs to scan
|
|
break
|
|
|
|
# Ping the IP and measure the time
|
|
ping_time = 0
|
|
ip_str = str(ip_address)
|
|
is_active, ping_delay = self._ping_host_with_time(ip_str)
|
|
|
|
# If active, add to results - but don't get host info yet
|
|
if is_active:
|
|
self.log_message(
|
|
f"Host discovered: {ip_str} ({ping_delay:.1f}ms)", "DISCOVERY"
|
|
)
|
|
self.scan_results.append(ip_str)
|
|
|
|
# Add to results tree with placeholder values and ping time
|
|
self.master.after(
|
|
0,
|
|
lambda ip=ip_str, delay=ping_delay: self.add_scan_result(
|
|
ip, delay, "", "", ""
|
|
),
|
|
)
|
|
|
|
# Update progress
|
|
self.scan_completed = self.scan_completed + 1
|
|
self.scan_progress_var.set(self.scan_completed)
|
|
|
|
# Mark task as done
|
|
self.scan_queue.task_done()
|
|
|
|
except Exception as e:
|
|
self.log_message(f"Error in scan worker: {str(e)}")
|
|
|
|
def gather_host_information(self):
|
|
"""Gather hostname and MAC address information for discovered hosts"""
|
|
if not self.scan_results:
|
|
self.show_info("No hosts discovered yet. Run a scan first.")
|
|
return
|
|
|
|
# Start the information gathering in a separate thread
|
|
threading.Thread(target=self._execute_host_gathering, daemon=True).start()
|
|
|
|
def _execute_host_gathering(self):
|
|
"""Perform the actual host information gathering"""
|
|
try:
|
|
self.log_message(
|
|
f"Gathering information for {len(self.scan_results)} hosts...", "INFO"
|
|
)
|
|
|
|
# Reset progress bar for this operation
|
|
total_hosts = len(self.scan_results)
|
|
self.scan_progress_var.set(0)
|
|
self.scan_progress["maximum"] = total_hosts
|
|
|
|
# Get information for each host
|
|
for i, ip in enumerate(self.scan_results):
|
|
if self.scan_stop_event.is_set():
|
|
self.log_message("Host information gathering stopped.", "WARN")
|
|
break
|
|
|
|
self.log_message(
|
|
f"Getting information for host {ip} ({i+1}/{total_hosts})..."
|
|
)
|
|
|
|
# Get hostname and MAC
|
|
hostname = self.get_hostname(ip)
|
|
mac_address = self.get_mac_address(ip)
|
|
vendor = ""
|
|
|
|
# Look up vendor information if we have a MAC address
|
|
if mac_address and MAC_LOOKUP_AVAILABLE:
|
|
vendor = self.get_mac_vendor(mac_address)
|
|
|
|
# Log the results in a cleaner format
|
|
result_parts = []
|
|
if hostname:
|
|
result_parts.append(f"Hostname: {hostname}")
|
|
if mac_address:
|
|
result_parts.append(f"MAC: {mac_address}")
|
|
if vendor:
|
|
result_parts.append(f"Vendor: {vendor}")
|
|
|
|
if result_parts:
|
|
self.log_message(f" {ip} → " + " | ".join(result_parts))
|
|
else:
|
|
self.log_message(f" {ip} → No additional information found")
|
|
|
|
# Update the tree with vendor information
|
|
self.update_host_in_tree(ip, hostname, mac_address, vendor)
|
|
|
|
# Update progress
|
|
self.scan_progress_var.set(i + 1)
|
|
|
|
self.log_message("Host information gathering completed.", "SUCCESS")
|
|
|
|
except Exception as e:
|
|
self.log_message(f"Error gathering host information: {str(e)}", "ERROR")
|
|
|
|
def update_host_in_tree(self, ip, hostname, mac, vendor=""):
|
|
"""Update an existing host in the treeview with obtained information"""
|
|
try:
|
|
# Find the item with this IP
|
|
for item_id in self.scan_results_tree.get_children():
|
|
values = self.scan_results_tree.item(item_id, "values")
|
|
if values and values[0] == ip:
|
|
# Get the current delay value (to preserve it)
|
|
delay = values[1] if len(values) > 1 else ""
|
|
|
|
# Update the values, preserving the delay value
|
|
hostname_str = hostname if hostname else "Not resolved"
|
|
mac_str = (
|
|
mac.upper() if mac else "Not found"
|
|
) # Ensure uppercase MAC
|
|
vendor_str = vendor if vendor else "Unknown"
|
|
|
|
self.scan_results_tree.item(
|
|
item_id, values=(ip, delay, hostname_str, mac_str, vendor_str)
|
|
)
|
|
break
|
|
except Exception as e:
|
|
self.log_message(f"Error updating host in treeview: {str(e)}")
|
|
|
|
def add_scan_result(self, ip, delay=0, hostname="", mac="", vendor=""):
|
|
"""Helper method to add scan result to the treeview"""
|
|
try:
|
|
# Insert the item with all values
|
|
hostname = hostname if hostname else "Not resolved"
|
|
mac = mac if mac else "Not found"
|
|
vendor = vendor if vendor else "Unknown"
|
|
delay_str = f"{delay:.1f}" if delay > 0 else ""
|
|
|
|
self.scan_results_tree.insert(
|
|
"", "end", values=(ip, delay_str, hostname, mac, vendor)
|
|
)
|
|
except Exception as e:
|
|
self.log_message(f"Error adding scan result to UI: {str(e)}")
|
|
|
|
def _ping_host_with_time(self, host: str) -> tuple[bool, float]:
|
|
"""Ping a host to check if it's active, returns (success, delay_in_ms)"""
|
|
try:
|
|
if PING_LIBRARY_AVAILABLE:
|
|
# Use PythonPing with shorter timeout for scanning
|
|
start_time = time.time()
|
|
response = pythonping(host, count=1, timeout=0.5)
|
|
ping_time = 0
|
|
|
|
if response.success():
|
|
# Get the actual ping time from the response
|
|
for reply in response:
|
|
if reply.success:
|
|
ping_time = reply.time_elapsed_ms
|
|
break
|
|
return True, ping_time
|
|
return False, 0
|
|
else:
|
|
# Fallback using socket with timing
|
|
start_time = time.time()
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(0.5)
|
|
result = s.connect_ex((host, 80)) # Try common port
|
|
ping_time = (time.time() - start_time) * 1000 # ms
|
|
s.close()
|
|
return result == 0, ping_time if result == 0 else 0
|
|
except:
|
|
return False, 0
|
|
|
|
def set_static_ip(self):
|
|
interface = self.if_var.get()
|
|
if not interface:
|
|
self.show_error("Please select a network interface")
|
|
return
|
|
|
|
# Guardar configuración actual antes de cambiarla
|
|
self.save_previous_config(interface)
|
|
|
|
# Validar IP
|
|
is_valid, result = self.validate_ip_input()
|
|
if not is_valid:
|
|
self.show_error(result)
|
|
return
|
|
|
|
ip = result
|
|
subnet_mask = self.subnet_mask.get()
|
|
|
|
# Guardar IP actual en el historial
|
|
self.add_to_history(self.ip_prefix.get())
|
|
|
|
# Log the actual command we'll be executing
|
|
gateway = f"{self.ip_prefix.get()}.1"
|
|
command = f'netsh interface ip set address "{interface}" static {ip} {subnet_mask} {gateway}'
|
|
self.log_message(f"Executing network command: {command}")
|
|
|
|
# Ejecutar script con privilegios - pass subnet mask
|
|
if self.execute_admin_script(
|
|
interface, ip, debug=True, subnet_mask=subnet_mask
|
|
):
|
|
time.sleep(2) # Esperar a que se apliquen los cambios
|
|
self.refresh_interfaces()
|
|
self.log_message(
|
|
f"Successfully set static IP {ip} with mask {subnet_mask} on {interface}"
|
|
)
|
|
else:
|
|
self.show_error("Failed to set static IP. Check the log for details.")
|
|
|
|
def set_dhcp(self):
|
|
interface = self.if_var.get()
|
|
if not interface:
|
|
self.show_error("Please select a network interface")
|
|
return
|
|
|
|
# Guardar configuración actual antes de cambiarla
|
|
self.save_previous_config(interface)
|
|
|
|
# Ejecutar script con privilegios
|
|
if self.execute_admin_script(interface, "dhcp", debug=True):
|
|
time.sleep(2) # Esperar a que se apliquen los cambios
|
|
self.refresh_interfaces()
|
|
self.log_message(f"Successfully enabled DHCP on {interface}")
|
|
else:
|
|
self.show_error("Failed to enable DHCP. Check the log for details.")
|
|
|
|
def run_command(self, cmd: str) -> bool:
|
|
self.log_message(f"Executing command: {cmd}")
|
|
try:
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
)
|
|
|
|
while True:
|
|
output = process.stdout.readline()
|
|
if output == "" and process.poll() is not None:
|
|
break
|
|
if output:
|
|
self.log_message(output.strip())
|
|
|
|
retcode = process.poll()
|
|
|
|
error_output = process.stderr.read()
|
|
if error_output:
|
|
self.log_message(f"Error output:\n{error_output}")
|
|
|
|
if retcode == 0:
|
|
self.log_message("Command executed successfully")
|
|
return True
|
|
else:
|
|
self.log_message(f"Command failed with return code: {retcode}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.log_message(f"Error executing command: {str(e)}")
|
|
return False
|
|
|
|
def is_admin(self):
|
|
try:
|
|
return ctypes.windll.shell32.IsUserAnAdmin()
|
|
except Exception as e:
|
|
self.log_message(f"Error checking admin privileges: {str(e)}")
|
|
return False
|
|
|
|
def elevate_privileges(self):
|
|
if sys.platform == "win32":
|
|
self.show_info(
|
|
"The application needs administrator privileges to change network settings. "
|
|
"Please confirm the UAC prompt."
|
|
)
|
|
script_path = os.path.abspath(sys.argv[0])
|
|
try:
|
|
ctypes.windll.shell32.ShellExecuteW(
|
|
None, "runas", sys.executable, f'"{script_path}"', None, 1
|
|
)
|
|
self.master.quit()
|
|
except Exception as e:
|
|
self.log_message(f"Error elevating privileges: {str(e)}")
|
|
self.show_error("Failed to elevate privileges")
|
|
|
|
def show_error(self, message: str):
|
|
self.log_message(f"ERROR: {message}", "ERROR")
|
|
messagebox.showerror("Error", message)
|
|
|
|
def show_info(self, message: str):
|
|
self.log_message(f"INFO: {message}", "INFO")
|
|
messagebox.showinfo("Information", message)
|
|
|
|
# Add new methods for subnet mask conversion
|
|
def on_subnet_mask_changed(self, *args):
|
|
"""Update CIDR bits when subnet mask changes and update scan IP range"""
|
|
# Use a static variable to prevent recursion
|
|
if hasattr(self, "_updating_mask") and self._updating_mask:
|
|
return
|
|
|
|
try:
|
|
self._updating_mask = True
|
|
|
|
mask = self.subnet_mask.get()
|
|
if self.is_valid_subnet_mask(mask):
|
|
# Convert mask to bits
|
|
bits = self.subnet_mask_to_cidr(mask)
|
|
self.cidr_bits.set(str(bits))
|
|
|
|
# Update scan IP range if we have a current IP
|
|
current_ip = self.current_ip_var.get()
|
|
if current_ip:
|
|
self.update_scan_ip_range(current_ip, mask)
|
|
|
|
except Exception as e:
|
|
self.log_message(f"Error updating CIDR bits: {str(e)}")
|
|
finally:
|
|
self._updating_mask = False
|
|
|
|
def on_cidr_bits_changed(self, *args):
|
|
"""Update subnet mask when CIDR bits change and update scan IP range"""
|
|
# Use a static variable to prevent recursion
|
|
if hasattr(self, "_updating_bits") and self._updating_bits:
|
|
return
|
|
|
|
try:
|
|
self._updating_bits = True
|
|
|
|
bits_str = self.cidr_bits.get()
|
|
if bits_str.isdigit():
|
|
bits = int(bits_str)
|
|
if 0 <= bits <= 32:
|
|
# Convert bits to mask
|
|
mask = self.cidr_to_subnet_mask(bits)
|
|
self.subnet_mask.set(mask)
|
|
|
|
# Update scan IP range if we have a current IP
|
|
current_ip = self.current_ip_var.get()
|
|
if current_ip:
|
|
self.update_scan_ip_range(current_ip, mask)
|
|
|
|
except Exception as e:
|
|
self.log_message(f"Error updating subnet mask: {str(e)}")
|
|
finally:
|
|
self._updating_bits = False
|
|
|
|
def is_valid_subnet_mask(self, mask: str) -> bool:
|
|
"""Validate if the string is a valid subnet mask"""
|
|
try:
|
|
# Check if it has the format x.x.x.x
|
|
parts = mask.split(".")
|
|
if len(parts) != 4:
|
|
return False
|
|
|
|
# Each part should be a number between 0-255
|
|
for part in parts:
|
|
if not part.isdigit() or not 0 <= int(part) <= 255:
|
|
return False
|
|
|
|
# Check if it's a valid subnet mask pattern
|
|
# Convert to binary and ensure it's a continuous sequence of 1s followed by 0s
|
|
binary = "".join([bin(int(octet))[2:].zfill(8) for octet in parts])
|
|
if "01" in binary: # If there's a 0 followed by 1, it's not valid
|
|
return False
|
|
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def subnet_mask_to_cidr(self, mask: str) -> int:
|
|
"""Convert subnet mask to CIDR notation bits"""
|
|
try:
|
|
# Convert each octet to binary and count the number of 1s
|
|
parts = mask.split(".")
|
|
binary = "".join([bin(int(octet))[2:].zfill(8) for octet in parts])
|
|
return binary.count("1")
|
|
except Exception as e:
|
|
self.log_message(f"Error converting subnet mask to CIDR: {str(e)}")
|
|
return 24 # Default to /24
|
|
|
|
def cidr_to_subnet_mask(self, bits: int) -> str:
|
|
"""Convert CIDR bits to subnet mask in dotted decimal notation"""
|
|
try:
|
|
# Create a binary string with the specified number of 1s followed by 0s
|
|
binary = "1" * bits + "0" * (32 - bits)
|
|
|
|
# Split the binary string into 4 octets and convert each to decimal
|
|
octets = [binary[i : i + 8] for i in range(0, 32, 8)]
|
|
decimals = [str(int(octet, 2)) for octet in octets]
|
|
|
|
return ".".join(decimals)
|
|
except Exception as e:
|
|
self.log_message(f"Error converting CIDR to subnet mask: {str(e)}")
|
|
return "255.255.255.0" # Default to 255.255.255.0
|
|
|
|
def update_ping_target(self, ip_prefix):
|
|
"""Update the ping target field based on the selected IP prefix"""
|
|
if ip_prefix:
|
|
target = self.config.get_ping_target(ip_prefix)
|
|
self.ping_target.set(target)
|
|
|
|
def save_current_ping_target(self, event=None):
|
|
"""Save the current ping target for the current IP prefix"""
|
|
ip_prefix = self.ip_prefix.get()
|
|
target = self.ping_target.get()
|
|
|
|
if ip_prefix and target:
|
|
self.config.set_ping_target(ip_prefix, target)
|
|
self.log_message(f"Saved ping target {target} for IP prefix {ip_prefix}")
|
|
|
|
def copy_to_clipboard(self, value):
|
|
"""Copy the given value to clipboard"""
|
|
self.master.clipboard_clear()
|
|
self.master.clipboard_append(value)
|
|
self.log_message(f"Copied to clipboard: {value}")
|
|
|
|
def copy_current_ip(self):
|
|
"""Copy the current IP to clipboard"""
|
|
ip = self.current_ip_var.get()
|
|
if ip:
|
|
self.copy_to_clipboard(ip)
|
|
else:
|
|
self.show_info("No IP address available to copy")
|
|
|
|
def copy_current_gateway(self):
|
|
"""Copy the current gateway to clipboard"""
|
|
gateway = self.current_gateway_var.get()
|
|
if gateway:
|
|
self.copy_to_clipboard(gateway)
|
|
else:
|
|
self.show_info("No gateway address available to copy")
|
|
|
|
def update_scan_ip_range(self, ip_address, subnet_mask):
|
|
"""Calculate and update scan IP range based on current IP and subnet mask"""
|
|
try:
|
|
# Calculate network address and broadcast address
|
|
ip_obj = ipaddress.IPv4Address(ip_address)
|
|
mask_obj = ipaddress.IPv4Address(subnet_mask)
|
|
|
|
# Convert mask to binary string
|
|
mask_bits = bin(int(mask_obj))[2:].zfill(32)
|
|
network_bits = mask_bits.count("1")
|
|
|
|
# Update the scan CIDR field
|
|
self.scan_cidr_bits.set(str(network_bits))
|
|
|
|
# Create network object
|
|
network = ipaddress.IPv4Network(
|
|
f"{ip_address}/{network_bits}", strict=False
|
|
)
|
|
|
|
# Get first and last usable host addresses
|
|
if network.num_addresses <= 2:
|
|
# For point-to-point links or /31 masks
|
|
start_ip = network.network_address
|
|
end_ip = network.broadcast_address
|
|
else:
|
|
# Skip network address and broadcast address
|
|
start_ip = network.network_address + 1
|
|
end_ip = network.broadcast_address - 1
|
|
|
|
# Update the scan fields
|
|
self.scan_start_ip.set(str(start_ip))
|
|
self.scan_end_ip.set(str(end_ip))
|
|
|
|
# Update the nodes to scan count
|
|
nodes_count = int(end_ip) - int(start_ip) + 1
|
|
self.nodes_to_scan.set(str(nodes_count))
|
|
|
|
self.log_message(
|
|
f"Updated scan range: {start_ip} - {end_ip} ({nodes_count} nodes)"
|
|
)
|
|
|
|
except Exception as e:
|
|
self.log_message(f"Error calculating IP range: {str(e)}")
|
|
|
|
def update_scan_range_from_cidr(self, *args):
|
|
"""Update scan IP range when CIDR value changes - only modify End IP"""
|
|
try:
|
|
start_ip = self.scan_start_ip.get().strip()
|
|
cidr_bits = self.scan_cidr_bits.get().strip()
|
|
|
|
if not start_ip or not cidr_bits.isdigit():
|
|
return
|
|
|
|
# Convert CIDR to network
|
|
bits = int(cidr_bits)
|
|
if 0 <= bits <= 32:
|
|
# Use the start IP as the base for the network
|
|
network = ipaddress.IPv4Network(f"{start_ip}/{bits}", strict=False)
|
|
|
|
# Keep the original start IP and just update the end IP
|
|
# For the end IP, use the broadcast address (or last usable address)
|
|
if network.num_addresses <= 2:
|
|
end_ip = network.broadcast_address
|
|
else:
|
|
end_ip = network.broadcast_address - 1
|
|
|
|
# Update only the end IP field
|
|
self.scan_end_ip.set(str(end_ip))
|
|
|
|
# Update the nodes to scan count
|
|
start = ipaddress.IPv4Address(start_ip)
|
|
nodes_count = int(end_ip) - int(start) + 1
|
|
self.nodes_to_scan.set(str(nodes_count))
|
|
|
|
self.log_message(
|
|
f"Updated scan range from CIDR: {start_ip} - {end_ip} ({nodes_count} nodes)"
|
|
)
|
|
except Exception as e:
|
|
self.log_message(f"Error updating scan range from CIDR: {str(e)}")
|
|
|
|
def get_hostname(self, ip_address):
|
|
"""Try to resolve hostname for an IP address with better error handling"""
|
|
try:
|
|
# Silently try to resolve without logging the attempt
|
|
hostname, _, _ = socket.gethostbyaddr(ip_address)
|
|
return hostname
|
|
except socket.herror:
|
|
# Don't log the error details - just return empty
|
|
return ""
|
|
except socket.gaierror:
|
|
return ""
|
|
except socket.timeout:
|
|
return ""
|
|
except Exception:
|
|
return ""
|
|
|
|
def get_mac_address(self, ip_address):
|
|
"""Get MAC address for an IP using ARP on Windows with improved parsing"""
|
|
try:
|
|
# Run the ARP command with timeout to avoid hanging
|
|
result = subprocess.run(
|
|
f"arp -a {ip_address}",
|
|
shell=True,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
)
|
|
|
|
if result.returncode == 0 and result.stdout:
|
|
# Don't log the raw ARP output anymore
|
|
|
|
# Parse the output to find the MAC address
|
|
# First try the typical format with hyphens or colons
|
|
mac_match = re.search(
|
|
r"([0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2})",
|
|
result.stdout,
|
|
re.IGNORECASE,
|
|
)
|
|
if mac_match:
|
|
# Convert to uppercase for consistent display
|
|
return mac_match.group(0).upper()
|
|
|
|
# Try an alternative format with spaces (Windows format)
|
|
mac_match = re.search(
|
|
r"([0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2})",
|
|
result.stdout,
|
|
re.IGNORECASE,
|
|
)
|
|
if mac_match:
|
|
# Convert to uppercase for consistent display
|
|
return mac_match.group(0).upper()
|
|
|
|
# Try yet another approach - look for any string of hex digits with separators
|
|
lines = result.stdout.strip().split("\n")
|
|
for line in lines:
|
|
if ip_address in line:
|
|
parts = line.split()
|
|
# Usually the MAC address is the second column in the output
|
|
if len(parts) >= 2:
|
|
# Check if the second part looks like a MAC address
|
|
if re.match(
|
|
r"([0-9a-f]{2}[^\w]){5}[0-9a-f]{2}",
|
|
parts[1],
|
|
re.IGNORECASE,
|
|
):
|
|
# Convert to uppercase for consistent display
|
|
return parts[1].upper()
|
|
|
|
# Only return empty string if MAC not found - don't log the failure
|
|
return ""
|
|
except subprocess.TimeoutExpired:
|
|
return ""
|
|
except Exception:
|
|
return ""
|
|
|
|
def get_mac_vendor(self, mac_address):
|
|
"""Look up the vendor for a MAC address"""
|
|
try:
|
|
if not MAC_LOOKUP_AVAILABLE:
|
|
return ""
|
|
|
|
# Clean up the MAC address format - the library expects a standardized format
|
|
# Remove spaces, dashes, colons and ensure uppercase
|
|
clean_mac = (
|
|
mac_address.replace(" ", "").replace("-", "").replace(":", "").upper()
|
|
)
|
|
|
|
# Add colons to form a standard MAC format for lookup
|
|
formatted_mac = ":".join(
|
|
[clean_mac[i : i + 2] for i in range(0, len(clean_mac), 2)]
|
|
)
|
|
|
|
# Look up the vendor
|
|
vendor = self.mac_lookup.lookup(formatted_mac)
|
|
return vendor
|
|
except Exception as e:
|
|
self.log_message(f"Error looking up MAC vendor: {str(e)}")
|
|
return ""
|
|
|
|
|
|
def main():
|
|
root = tk.Tk()
|
|
app = IPChangerApp(root)
|
|
root.mainloop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|