IPchangeNG/menu-ip-change.py

1796 lines
68 KiB
Python

import tkinter as tk
from tkinter import ttk, messagebox
import json
import os
import subprocess
import re
import ctypes
import sys
import ipaddress
from typing import List, Optional, Dict
from dataclasses import dataclass
import threading
import time
import socket
import queue
import struct
# Add libraries for ping and SNMP functionality
try:
from pythonping import ping as pythonping
PING_LIBRARY_AVAILABLE = True
except ImportError:
PING_LIBRARY_AVAILABLE = False
print(
"PythonPing module not found. Install with 'pip install pythonping' for better ping functionality."
)
try:
from pysnmp.hlapi import *
SNMP_AVAILABLE = True
except ImportError:
SNMP_AVAILABLE = False
print(
"pysnmp module not found. Install with 'pip install pysnmp' for SNMP functionality."
)
try:
import wmi
WMI_AVAILABLE = True
except ImportError:
WMI_AVAILABLE = False
print(
"WMI module not found. Using alternative method for network interface detection."
)
@dataclass
class NetworkInterface:
name: str
description: str
ip_address: str = ""
subnet_mask: str = ""
gateway: str = ""
dhcp_enabled: bool = True
active: bool = True
adapter_id: str = ""
class IPChangeConfig:
def __init__(self):
self.config_file = "ip_config.json"
self.history_file = "ip_history.json"
self.ping_targets_file = "ping_targets.json" # New file for ping targets
self.load_config()
self.load_ping_targets() # Load ping targets
def load_config(self):
try:
if os.path.exists(self.config_file):
with open(self.config_file, "r") as f:
config = json.load(f)
self.last_interface = config.get("last_interface", "")
self.last_ip_prefix = config.get("last_ip_prefix", "")
self.previous_config = config.get("previous_config", {})
else:
self.last_interface = ""
self.last_ip_prefix = ""
self.previous_config = {}
except Exception as e:
print(f"Error loading config: {e}")
self.last_interface = ""
self.last_ip_prefix = ""
self.previous_config = {}
def save_config(self):
try:
config = {
"last_interface": self.last_interface,
"last_ip_prefix": self.last_ip_prefix,
"previous_config": self.previous_config,
}
with open(self.config_file, "w") as f:
json.dump(config, f)
except Exception as e:
print(f"Error saving config: {e}")
def load_ping_targets(self):
"""Load saved ping targets for each IP prefix"""
try:
if os.path.exists(self.ping_targets_file):
with open(self.ping_targets_file, "r") as f:
self.ping_targets = json.load(f)
else:
self.ping_targets = {} # Dictionary to store ping targets by IP prefix
except Exception as e:
print(f"Error loading ping targets: {e}")
self.ping_targets = {}
def save_ping_targets(self):
"""Save ping targets for each IP prefix"""
try:
with open(self.ping_targets_file, "w") as f:
json.dump(self.ping_targets, f)
except Exception as e:
print(f"Error saving ping targets: {e}")
def get_ping_target(self, ip_prefix):
"""Get the saved ping target for the given IP prefix"""
if not ip_prefix:
return ""
# Try to find exact match first
if ip_prefix in self.ping_targets:
return self.ping_targets[ip_prefix]
# If no exact match, try to find a matching IP prefix
for prefix, target in self.ping_targets.items():
if ip_prefix.startswith(prefix) or prefix.startswith(ip_prefix):
return target
# Default target: IP prefix + ".1" (usually the gateway)
return f"{ip_prefix}.1"
def set_ping_target(self, ip_prefix, target):
"""Save a ping target for the given IP prefix"""
if ip_prefix and target:
self.ping_targets[ip_prefix] = target
self.save_ping_targets()
class IPChangerApp:
def __init__(self, master):
self.master = master
self.master.title("Network Interface IP Configuration")
self.master.geometry("900x700")
# Inicializar configuración antes que nada
self.config = IPChangeConfig()
# Inicializar estructuras de datos
self.interfaces: List[NetworkInterface] = []
self.ip_history: List[str] = []
# Variables for continuous ping
self.continuous_ping = tk.BooleanVar(value=False)
self.ping_running = False
self.ping_stop_event = threading.Event()
# Variables for IP scanning
self.scan_results = []
self.scan_running = False
self.scan_stop_event = threading.Event()
self.scan_queue = queue.Queue()
self.scan_threads = []
self.scan_progress_var = tk.IntVar(value=0)
# Inicializar WMI si está disponible
if WMI_AVAILABLE:
self.wmi_client = wmi.WMI()
# Cargar datos guardados antes de crear widgets
self.load_ip_history()
# Crear la interfaz con pestañas
self.create_widgets()
# Initialize trace variables
self.subnet_trace_id = None
self.cidr_trace_id = None
# Set up initial traces safely
self.setup_mask_traces()
# Actualizar la lista de IPs en el combo
self.update_history_display()
# Refrescar interfaces
self.refresh_interfaces()
# Configurar pesos de la cuadrícula
self.master.grid_columnconfigure(0, weight=1)
self.master.grid_rowconfigure(1, weight=1) # El log expandible
def setup_mask_traces(self):
"""Set up traces for subnet mask and CIDR fields safely"""
# First initialize variables
self.subnet_mask.set(self.subnet_mask.get()) # Ensure current value is valid
self.cidr_bits.set(self.cidr_bits.get()) # Ensure current value is valid
# Add traces using a different approach - using add not trace_add
self.subnet_mask.trace("w", self.on_subnet_mask_changed)
self.cidr_bits.trace("w", self.on_cidr_bits_changed)
self.log_message("Subnet mask traces initialized")
def remove_mask_traces(self):
"""Not needed - we'll use a different approach"""
pass # Intentionally empty
def get_ip_prefix(self, ip: str) -> str:
"""Extrae los primeros 3 octetos de una dirección IP"""
parts = ip.split(".")
if len(parts) >= 3:
return ".".join(parts[:3])
return ip
def create_widgets(self):
# Create a notebook (tabbed interface)
self.notebook = ttk.Notebook(self.master)
self.notebook.grid(row=0, column=0, sticky="nsew", padx=10, pady=10)
# Create frames for each tab
self.ip_setup_tab = ttk.Frame(self.notebook, padding="10")
self.tools_tab = ttk.Frame(self.notebook, padding="10")
# Add tabs to the notebook
self.notebook.add(self.ip_setup_tab, text="IP Setup")
self.notebook.add(self.tools_tab, text="Tools")
# Create IP Setup tab content (moved from original layout)
self.create_ip_setup_tab()
# Create Tools tab content
self.create_tools_tab()
# Log frame - shared between tabs
self.log_frame = ttk.LabelFrame(self.master, text="Operation Log", padding="5")
self.log_frame.grid(row=1, column=0, sticky="nsew", padx=10, pady=(0, 10))
self.log_text = tk.Text(self.log_frame, wrap="none", height=10)
self.log_scrollbar_y = ttk.Scrollbar(
self.log_frame, orient="vertical", command=self.log_text.yview
)
self.log_scrollbar_x = ttk.Scrollbar(
self.log_frame, orient="horizontal", command=self.log_text.xview
)
self.log_text.configure(
yscrollcommand=self.log_scrollbar_y.set,
xscrollcommand=self.log_scrollbar_x.set,
)
self.log_text.grid(row=0, column=0, sticky="nsew")
self.log_scrollbar_y.grid(row=0, column=1, sticky="ns")
self.log_scrollbar_x.grid(row=1, column=0, sticky="ew")
ttk.Button(self.log_frame, text="Clear Log", command=self.clear_log).grid(
row=2, column=0, columnspan=2, pady=5
)
self.log_frame.grid_columnconfigure(0, weight=1)
self.log_frame.grid_rowconfigure(0, weight=1)
def create_ip_setup_tab(self):
# Control frame for IP Setup tab
self.control_frame = ttk.Frame(self.ip_setup_tab)
self.control_frame.grid(row=0, column=0, sticky="new")
self.control_frame.columnconfigure(0, weight=1)
# Sección de interfaces
self.interfaces_frame = ttk.LabelFrame(
self.control_frame, text="Network Interfaces", padding="5"
)
self.interfaces_frame.grid(row=0, column=0, sticky="ew", pady=(0, 10))
ttk.Label(self.interfaces_frame, text="Select Interface:").grid(
row=0, column=0, sticky="w", padx=5
)
self.if_var = tk.StringVar()
self.if_combo = ttk.Combobox(
self.interfaces_frame, textvariable=self.if_var, state="readonly", width=50
)
self.if_combo.grid(row=0, column=1, sticky="ew", padx=5)
self.if_combo.bind("<<ComboboxSelected>>", self.on_interface_selected)
# Info de configuración actual
self.current_frame = ttk.LabelFrame(
self.control_frame, text="Current Configuration", padding="5"
)
self.current_frame.grid(row=1, column=0, sticky="ew", pady=(0, 10))
self.current_ip_var = tk.StringVar()
self.current_mask_var = tk.StringVar()
self.current_gateway_var = tk.StringVar()
self.dhcp_status_var = tk.StringVar()
ttk.Label(self.current_frame, text="Current IP:").grid(
row=0, column=0, sticky="w", padx=5
)
ttk.Label(self.current_frame, textvariable=self.current_ip_var).grid(
row=0, column=1, sticky="w"
)
# Add copy button for IP
ttk.Button(
self.current_frame, text="Copy", width=5, command=self.copy_current_ip
).grid(row=0, column=2, padx=5)
ttk.Label(self.current_frame, text="Subnet Mask:").grid(
row=1, column=0, sticky="w", padx=5
)
ttk.Label(self.current_frame, textvariable=self.current_mask_var).grid(
row=1, column=1, sticky="w"
)
ttk.Label(self.current_frame, text="Gateway:").grid(
row=2, column=0, sticky="w", padx=5
)
ttk.Label(self.current_frame, textvariable=self.current_gateway_var).grid(
row=2, column=1, sticky="w"
)
# Add copy button for Gateway
ttk.Button(
self.current_frame, text="Copy", width=5, command=self.copy_current_gateway
).grid(row=2, column=2, padx=5)
ttk.Label(self.current_frame, text="DHCP Status:").grid(
row=3, column=0, sticky="w", padx=5
)
ttk.Label(self.current_frame, textvariable=self.dhcp_status_var).grid(
row=3, column=1, sticky="w"
)
# Sección de configuración IP
self.ip_frame = ttk.LabelFrame(
self.control_frame, text="IP Configuration", padding="5"
)
self.ip_frame.grid(row=2, column=0, sticky="ew", pady=(0, 10))
ttk.Label(self.ip_frame, text="IP Prefix (first 3 octets):").grid(
row=0, column=0, sticky="w", padx=5
)
self.ip_prefix = tk.StringVar(value=self.config.last_ip_prefix)
self.ip_entry = ttk.Entry(self.ip_frame, textvariable=self.ip_prefix, width=30)
self.ip_entry.grid(row=0, column=1, sticky="w", padx=5)
ttk.Label(self.ip_frame, text="Last Octet:").grid(
row=0, column=2, sticky="w", padx=5
)
self.last_octet = tk.StringVar(value="249")
self.last_octet_entry = ttk.Entry(
self.ip_frame, textvariable=self.last_octet, width=5
)
self.last_octet_entry.grid(row=0, column=3, sticky="w", padx=5)
# Add subnet mask configuration
ttk.Label(self.ip_frame, text="Subnet Mask:").grid(
row=1, column=0, sticky="w", padx=5
)
self.subnet_mask = tk.StringVar(value="255.255.255.0")
self.subnet_entry = ttk.Entry(
self.ip_frame, textvariable=self.subnet_mask, width=15
)
self.subnet_entry.grid(row=1, column=1, sticky="w", padx=5)
ttk.Label(self.ip_frame, text="CIDR (/bits):").grid(
row=1, column=2, sticky="w", padx=5
)
self.cidr_bits = tk.StringVar(value="24")
self.cidr_entry = ttk.Entry(self.ip_frame, textvariable=self.cidr_bits, width=5)
self.cidr_entry.grid(row=1, column=3, sticky="w", padx=5)
# Sección de historial
self.history_frame = ttk.LabelFrame(
self.control_frame, text="IP History", padding="5"
)
self.history_frame.grid(row=3, column=0, sticky="ew", pady=(0, 10))
ttk.Label(self.history_frame, text="Previous IPs:").grid(
row=0, column=0, sticky="w", padx=5
)
self.history_var = tk.StringVar()
self.history_combo = ttk.Combobox(
self.history_frame,
textvariable=self.history_var,
state="readonly",
width=50,
)
self.history_combo.grid(row=0, column=1, sticky="ew", padx=5)
self.history_combo.bind("<<ComboboxSelected>>", self.on_history_selected)
# Botones de acción
self.button_frame = ttk.Frame(self.control_frame)
self.button_frame.grid(row=4, column=0, sticky="ew", pady=(0, 10))
ttk.Button(
self.button_frame, text="Set Static IP", command=self.set_static_ip
).grid(row=0, column=0, padx=5)
ttk.Button(self.button_frame, text="Enable DHCP", command=self.set_dhcp).grid(
row=0, column=1, padx=5
)
ttk.Button(
self.button_frame, text="Restore Previous", command=self.restore_previous
).grid(row=0, column=2, padx=5)
ttk.Button(
self.button_frame,
text="Refresh Interfaces",
command=self.refresh_interfaces,
).grid(row=0, column=3, padx=5)
def create_tools_tab(self):
# Control frame for Tools tab
self.tools_control_frame = ttk.Frame(self.tools_tab)
self.tools_control_frame.grid(row=0, column=0, sticky="new")
self.tools_control_frame.columnconfigure(0, weight=1)
# Network Tools section (moved from original layout)
self.ping_frame = ttk.LabelFrame(
self.tools_control_frame, text="Ping Tool", padding="5"
)
self.ping_frame.grid(row=0, column=0, sticky="ew", pady=(0, 10))
ttk.Label(self.ping_frame, text="Target IP/Host:").grid(
row=0, column=0, sticky="w", padx=5
)
self.ping_target = tk.StringVar()
self.ping_entry = ttk.Entry(
self.ping_frame, textvariable=self.ping_target, width=40
)
self.ping_entry.grid(row=0, column=1, sticky="w", padx=5)
# Bind the FocusOut event to save the ping target
self.ping_entry.bind("<FocusOut>", self.save_current_ping_target)
# Add continuous ping checkbox
self.continuous_ping_check = ttk.Checkbutton(
self.ping_frame, text="Continuous Ping", variable=self.continuous_ping
)
self.continuous_ping_check.grid(row=0, column=2, padx=5)
ttk.Button(self.ping_frame, text="Ping", command=self.do_ping).grid(
row=0, column=3, padx=5
)
ttk.Button(self.ping_frame, text="Stop", command=self.stop_ping).grid(
row=0, column=4, padx=5
)
# Network Scan section
self.scan_frame = ttk.LabelFrame(
self.tools_control_frame, text="Network Scan", padding="5"
)
self.scan_frame.grid(row=1, column=0, sticky="ew", pady=(0, 10))
# Start IP
ttk.Label(self.scan_frame, text="Start IP:").grid(
row=0, column=0, sticky="w", padx=5
)
self.scan_start_ip = tk.StringVar()
self.scan_start_entry = ttk.Entry(
self.scan_frame, textvariable=self.scan_start_ip, width=15
)
self.scan_start_entry.grid(row=0, column=1, sticky="w", padx=5)
# End IP
ttk.Label(self.scan_frame, text="End IP:").grid(
row=0, column=2, sticky="w", padx=5
)
self.scan_end_ip = tk.StringVar()
self.scan_end_entry = ttk.Entry(
self.scan_frame, textvariable=self.scan_end_ip, width=15
)
self.scan_end_entry.grid(row=0, column=3, sticky="w", padx=5)
# Add CIDR field in Network Scan
ttk.Label(self.scan_frame, text="CIDR (/bits):").grid(
row=0, column=4, sticky="w", padx=5
)
self.scan_cidr_bits = tk.StringVar(value="24")
self.scan_cidr_entry = ttk.Entry(
self.scan_frame, textvariable=self.scan_cidr_bits, width=5
)
self.scan_cidr_entry.grid(row=0, column=5, sticky="w", padx=5)
# Add trace to update the scan range when CIDR changes
self.scan_cidr_bits.trace("w", self.update_scan_range_from_cidr)
# Display number of nodes to scan
ttk.Label(self.scan_frame, text="Nodes to scan:").grid(
row=1, column=0, sticky="w", padx=5
)
self.nodes_to_scan = tk.StringVar(value="0")
nodes_display = ttk.Entry(
self.scan_frame, textvariable=self.nodes_to_scan, width=10, state="readonly"
)
nodes_display.grid(row=1, column=1, sticky="w", padx=5)
# Scan buttons - move to row 1, column 2-5
self.scan_buttons_frame = ttk.Frame(self.scan_frame)
self.scan_buttons_frame.grid(row=1, column=2, columnspan=4, padx=5, sticky="e")
ttk.Button(
self.scan_buttons_frame, text="Start Scan", command=self.start_scan
).grid(row=0, column=0, padx=5)
ttk.Button(
self.scan_buttons_frame, text="Stop Scan", command=self.stop_scan
).grid(row=0, column=1, padx=5)
ttk.Button(
self.scan_buttons_frame,
text="Get Host Info",
command=self.gather_host_information,
).grid(row=0, column=2, padx=5)
# Scan progress - move to row 2
self.scan_progress = ttk.Progressbar(
self.scan_frame,
orient="horizontal",
length=300,
mode="determinate",
variable=self.scan_progress_var,
)
self.scan_progress.grid(
row=2, column=0, columnspan=6, sticky="ew", padx=5, pady=5
)
# Scan results - update row numbers
ttk.Label(self.scan_frame, text="Scan Results:").grid(
row=3, column=0, sticky="w", padx=5
)
# Frame for results list and scrollbar
self.results_frame = ttk.Frame(self.scan_frame)
self.results_frame.grid(
row=4, column=0, columnspan=6, sticky="nsew", padx=5, pady=5
)
self.results_frame.columnconfigure(0, weight=1)
self.results_frame.rowconfigure(0, weight=1)
# Replace Listbox with Treeview that has columns
self.scan_results_tree = ttk.Treeview(
self.results_frame,
columns=("ip", "hostname", "mac"),
show="headings",
height=10,
)
# Define columns
self.scan_results_tree.heading("ip", text="IP Address")
self.scan_results_tree.heading("hostname", text="Hostname")
self.scan_results_tree.heading("mac", text="MAC Address")
# Set column widths
self.scan_results_tree.column("ip", width=120, anchor="w")
self.scan_results_tree.column("hostname", width=200, anchor="w")
self.scan_results_tree.column("mac", width=150, anchor="w")
# Add scrollbar
self.scan_results_scrollbar = ttk.Scrollbar(
self.results_frame, orient="vertical", command=self.scan_results_tree.yview
)
self.scan_results_tree.configure(yscrollcommand=self.scan_results_scrollbar.set)
self.scan_results_tree.grid(row=0, column=0, sticky="nsew")
self.scan_results_scrollbar.grid(row=0, column=1, sticky="ns")
# Configure scan frame to expand
self.scan_frame.columnconfigure(1, weight=1)
self.scan_frame.columnconfigure(3, weight=1)
self.scan_frame.rowconfigure(3, weight=1)
def clear_log(self):
self.log_text.delete("1.0", tk.END)
def log_message(self, message: str):
self.log_text.insert(tk.END, f"{message}\n")
self.log_text.see(tk.END)
self.log_text.update_idletasks()
def load_ip_history(self):
try:
if os.path.exists(self.config.history_file):
with open(self.config.history_file, "r") as f:
self.ip_history = json.load(f)
else:
self.ip_history = []
except Exception as e:
self.log_message(f"Error loading IP history: {e}")
self.ip_history = []
def save_ip_history(self):
try:
with open(self.config.history_file, "w") as f:
json.dump(self.ip_history, f)
except Exception as e:
self.log_message(f"Error saving IP history: {e}")
def add_to_history(self, ip_prefix: str):
if ip_prefix and ip_prefix not in self.ip_history:
self.ip_history.insert(0, ip_prefix)
self.ip_history = self.ip_history[:15] # Mantener solo las últimas 15 IPs
self.save_ip_history()
self.update_history_display()
def update_history_display(self):
self.history_combo["values"] = self.ip_history
if self.ip_history:
self.history_combo.set(self.ip_history[0])
def on_history_selected(self, event):
selected_ip = self.history_var.get()
if selected_ip:
# Ensure we only get the first three octets
ip_parts = selected_ip.split(".")
if len(ip_parts) >= 3:
ip_prefix = ".".join(ip_parts[:3])
self.ip_prefix.set(ip_prefix)
self.config.last_ip_prefix = ip_prefix
self.config.save_config()
self.log_message(f"Selected IP prefix from history: {ip_prefix}")
# Update ping target for the new IP prefix
self.update_ping_target(ip_prefix)
def refresh_interfaces(self):
self.interfaces.clear()
try:
if WMI_AVAILABLE:
self._refresh_interfaces_wmi()
else:
self._refresh_interfaces_netsh()
# Actualizar combobox
self.if_combo["values"] = [
inf.name for inf in self.interfaces if inf.active
]
# Seleccionar última interfaz usada si está disponible
if self.config.last_interface in self.if_combo["values"]:
self.if_combo.set(self.config.last_interface)
self.on_interface_selected()
self.log_message("Interfaces refreshed successfully")
except Exception as e:
self.log_message(f"Error refreshing interfaces: {str(e)}")
self.show_error(f"Error refreshing interfaces: {str(e)}")
def _refresh_interfaces_wmi(self):
try:
adapters = self.wmi_client.Win32_NetworkAdapter(PhysicalAdapter=True)
configs = self.wmi_client.Win32_NetworkAdapterConfiguration()
for adapter in adapters:
config = next(
(
cfg
for cfg in configs
if cfg.InterfaceIndex == adapter.InterfaceIndex
),
None,
)
if config and config.IPEnabled:
interface = NetworkInterface(
name=adapter.NetConnectionID,
description=adapter.Description,
ip_address=config.IPAddress[0] if config.IPAddress else "",
subnet_mask=config.IPSubnet[0] if config.IPSubnet else "",
gateway=(
config.DefaultIPGateway[0]
if config.DefaultIPGateway
else ""
),
dhcp_enabled=config.DHCPEnabled,
active=adapter.NetEnabled,
adapter_id=adapter.GUID,
)
self.interfaces.append(interface)
except Exception as e:
raise Exception(f"WMI refresh error: {str(e)}")
def _refresh_interfaces_netsh(self):
try:
output = subprocess.check_output(
"netsh interface ipv4 show config", shell=True, text=True
)
configs = output.split("\n\n")
for config in configs:
if not config.strip():
continue
name_match = re.search(r"Configuración de\s+\"(.+?)\"", config)
if not name_match:
continue
name = name_match.group(1)
ip_match = re.search(r"Dirección IP:\s+(\d+\.\d+\.\d+\.\d+)", config)
mask_match = re.search(
r"Máscara de subred:\s+(\d+\.\d+\.\d+\.\d+)", config
)
gateway_match = re.search(
r"Puerta de enlace predeterminada:\s+(\d+\.\d+\.\d+\.\d+)", config
)
dhcp_match = re.search(r"DHCP habilitado:\s+(\w+)", config)
interface = NetworkInterface(
name=name,
description=name,
ip_address=ip_match.group(1) if ip_match else "",
subnet_mask=mask_match.group(1) if mask_match else "",
gateway=gateway_match.group(1) if gateway_match else "",
dhcp_enabled=(
dhcp_match.group(1).lower() == "" if dhcp_match else True
),
active=True,
)
self.interfaces.append(interface)
except subprocess.CalledProcessError as e:
raise Exception(f"Error executing netsh: {str(e)}")
except Exception as e:
raise Exception(f"Error parsing netsh output: {str(e)}")
def on_interface_selected(self, event=None):
selected = self.if_var.get()
interface = next((inf for inf in self.interfaces if inf.name == selected), None)
if interface:
self.current_ip_var.set(interface.ip_address)
self.current_mask_var.set(interface.subnet_mask)
self.current_gateway_var.set(interface.gateway)
self.dhcp_status_var.set(
"Enabled" if interface.dhcp_enabled else "Disabled"
)
# Actualizar IP prefix si hay una IP válida
if interface.ip_address:
prefix = ".".join(interface.ip_address.split(".")[:3])
self.ip_prefix.set(prefix)
# Update ping target for the new IP prefix
self.update_ping_target(prefix)
# Guardar interfaz seleccionada
self.config.last_interface = selected
self.config.save_config()
self.log_message(f"Selected interface: {selected}")
# Update scan IP range based on current IP and subnet mask
if interface.ip_address and interface.subnet_mask:
self.update_scan_ip_range(interface.ip_address, interface.subnet_mask)
def validate_ip_input(self) -> tuple[bool, str]:
try:
prefix = self.ip_prefix.get().strip()
last_octet = self.last_octet.get().strip()
subnet_mask = self.subnet_mask.get().strip()
# Validar formato del prefijo
if not re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}", prefix):
return False, "IP prefix must be in format: xxx.xxx.xxx"
# Validar último octeto
if not last_octet.isdigit() or not 0 <= int(last_octet) <= 255:
return False, "Last octet must be between 0 and 255"
# Validar máscara de subred
if not self.is_valid_subnet_mask(subnet_mask):
return False, "Invalid subnet mask format"
# Construir y validar IP completa
ip = f"{prefix}.{last_octet}"
ipaddress.IPv4Address(ip)
return True, ip
except ValueError as e:
return False, f"Invalid IP address: {str(e)}"
def execute_admin_script(
self,
interface: str,
mode: str,
debug: bool = True,
subnet_mask: str = "255.255.255.0",
) -> bool:
"""
Ejecuta el script administrativo con los argumentos proporcionados
mode puede ser una IP o 'dhcp'
"""
# Obtener rutas absolutas
current_dir = os.path.dirname(os.path.abspath(__file__))
script_path = os.path.join(current_dir, "ip-changer-admin.py")
python_exe = sys.executable
# Verificar que el script existe
if not os.path.exists(script_path):
error_msg = f"Admin script not found at: {script_path}"
self.log_message(error_msg)
return False
# Construir la línea de argumentos - pass both IP and subnet as a single parameter
if mode != "dhcp":
# For static IP, create the netsh command with subnet mask
gateway = (
f"{self.get_ip_prefix(mode)}.1" # Use the first three octets of the IP
)
netsh_cmd = f'netsh interface ip set address "{interface}" static {mode} {subnet_mask} {gateway}'
args = f'"{script_path}" "{interface}" "{netsh_cmd}"'
else:
# For DHCP, keep it simple
args = f'"{script_path}" "{interface}" "dhcp"'
if debug:
self.log_message("Debug Information:")
self.log_message(f"Current Directory: {current_dir}")
self.log_message(f"Script Path: {script_path}")
self.log_message(f"Python Executable: {python_exe}")
self.log_message(f"Full Command: {python_exe} {args}")
try:
self.log_message(
f"Attempting to execute admin script with elevated privileges"
)
ret = ctypes.windll.shell32.ShellExecuteW(
None, # hwnd
"runas", # operation
python_exe, # file
args, # parameters
current_dir, # directory
1, # show command
)
result_code = int(ret)
if result_code > 32:
self.log_message(
f"ShellExecuteW successful (return code: {result_code})"
)
return True
else:
error_codes = {
0: "The operating system is out of memory or resources",
2: "The specified file was not found",
3: "The specified path was not found",
5: "Access denied",
8: "Insufficient memory to complete the operation",
11: "Bad format",
26: "A sharing violation occurred",
27: "The file name association is incomplete or invalid",
28: "The DDE operation timed out",
29: "The DDE operation failed",
30: "The DDE operation is busy",
31: "The file name association is unavailable",
32: "No application is associated with the file",
}
self.log_message(
f"Failed to execute admin script. Error code {result_code}: {error_msg}"
)
return False
except Exception as e:
self.log_message(f"Exception while executing admin script: {str(e)}")
return False
def save_previous_config(self, interface_name: str):
"""Guarda la configuración actual de la interfaz antes de cambiarla"""
interface = next(
(inf for inf in self.interfaces if inf.name == interface_name), None
)
if interface:
self.config.previous_config = {
"name": interface.name,
"ip_address": interface.ip_address,
"subnet_mask": interface.subnet_mask,
"gateway": interface.gateway,
"dhcp_enabled": interface.dhcp_enabled,
}
self.config.save_config()
self.log_message(f"Previous configuration saved for {interface.name}")
def restore_previous(self):
"""Restaura la configuración IP anterior"""
prev_config = self.config.previous_config
if not prev_config:
self.show_error("No previous configuration available")
return
interface_name = prev_config.get("name")
if not interface_name:
self.show_error("Invalid previous configuration")
return
# Verificar que la interfaz existe
if interface_name not in [inf.name for inf in self.interfaces]:
self.show_error(f"Interface {interface_name} not found")
return
self.log_message(f"Restoring previous configuration for {interface_name}")
if prev_config.get("dhcp_enabled", True):
# Si la configuración anterior era DHCP
if self.execute_admin_script(interface_name, "dhcp"):
self.log_message(f"Successfully restored DHCP on {interface_name}")
time.sleep(2)
self.refresh_interfaces()
else:
self.show_error("Failed to restore DHCP configuration")
else:
# Si la configuración anterior era IP estática
ip = prev_config.get("ip_address")
if not ip:
self.show_error("Previous IP address not available")
return
if self.execute_admin_script(interface_name, ip):
self.log_message(f"Successfully restored IP {ip} on {interface_name}")
time.sleep(2)
self.refresh_interfaces()
else:
self.show_error("Failed to restore static IP configuration")
def do_ping(self):
"""Realiza un ping a la dirección especificada usando una biblioteca de Python"""
target = self.ping_target.get().strip()
if not target:
self.show_error("Please enter a target IP or hostname")
return
# Save the ping target when used
self.save_current_ping_target()
# Check if continuous ping is enabled
if self.continuous_ping.get():
self.log_message(f"Starting continuous ping to {target}...")
# Reset the stop event
self.ping_stop_event.clear()
self.ping_running = True
# Start ping in a separate thread
threading.Thread(
target=self._execute_continuous_ping, args=(target,), daemon=True
).start()
else:
self.log_message(f"Pinging {target}...")
# Standard ping (original behavior)
threading.Thread(
target=self._execute_ping, args=(target,), daemon=True
).start()
def stop_ping(self):
"""Stops any ongoing continuous ping"""
if self.ping_running:
self.log_message("Stopping continuous ping...")
self.ping_stop_event.set()
self.ping_running = False
def _execute_continuous_ping(self, target: str):
"""Execute continuous ping until stopped"""
try:
self.log_message(
f"Continuous ping to {target} started. Press Stop to terminate."
)
count = 0
while not self.ping_stop_event.is_set():
count += 1
if PING_LIBRARY_AVAILABLE:
try:
# Send a single ping
response = pythonping(target, count=1, timeout=1)
# Check if any reply was successful
if any(reply.success for reply in response):
# Get the first successful reply
for reply in response:
if reply.success:
rtt = reply.time_elapsed_ms
self.log_message(
f"Reply from {target}: time={rtt:.2f}ms (seq={count})"
)
break
else:
self.log_message(f"Request timed out (seq={count})")
except Exception as ping_error:
self.log_message(f"Ping error: {str(ping_error)} (seq={count})")
else:
# Fallback to socket
try:
start_time = time.time()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
result = s.connect_ex((target, 80))
elapsed_time = (time.time() - start_time) * 1000 # ms
s.close()
if result == 0:
self.log_message(
f"Connected to {target}:80 in {elapsed_time:.2f}ms (seq={count})"
)
else:
self.log_message(
f"Failed to connect to {target}:80 (seq={count})"
)
except Exception as socket_error:
self.log_message(
f"Connection error: {str(socket_error)} (seq={count})"
)
# Wait 1 second before next ping
for _ in range(
10
): # Check for stop every 100ms for more responsive stopping
if self.ping_stop_event.is_set():
break
time.sleep(0.1)
self.log_message(
f"Continuous ping to {target} stopped after {count} pings."
)
except Exception as e:
self.log_message(f"Error in continuous ping: {str(e)}")
finally:
self.ping_running = False
def _execute_ping(self, target: str):
"""Execute a single ping and display the results"""
try:
self.log_message(f"Pinging {target} with 4 echo requests...")
if PING_LIBRARY_AVAILABLE:
# Use PythonPing library
response = pythonping(target, count=4, timeout=1)
# Display individual replies
for i, reply in enumerate(response):
if reply.success:
rtt = reply.time_elapsed_ms
self.log_message(f"Reply from {target}: time={rtt:.2f}ms")
else:
self.log_message(f"Request timed out (seq={i+1})")
# Display statistics
success_count = sum(1 for r in response if r.success)
loss_percentage = (4 - success_count) * 25
self.log_message(f"\nPing statistics for {target}:")
self.log_message(
f" Packets: Sent = 4, Received = {success_count}, Lost = {4 - success_count} ({loss_percentage}% loss)"
)
if success_count > 0:
min_rtt = min(r.time_elapsed_ms for r in response if r.success)
max_rtt = max(r.time_elapsed_ms for r in response if r.success)
avg_rtt = (
sum(r.time_elapsed_ms for r in response if r.success)
/ success_count
)
self.log_message(f"Approximate round trip times in milliseconds:")
self.log_message(
f" Minimum = {min_rtt:.2f}ms, Maximum = {max_rtt:.2f}ms, Average = {avg_rtt:.2f}ms"
)
else:
# Fallback to socket if PythonPing is not available
self.log_message("Using socket connection test (not a true ICMP ping)")
success_count = 0
times = []
for i in range(4):
try:
start_time = time.time()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
result = s.connect_ex((target, 80)) # Try to connect to port 80
elapsed_time = (
time.time() - start_time
) * 1000 # Convert to ms
s.close()
if result == 0:
success_count += 1
times.append(elapsed_time)
self.log_message(
f"Connected to {target}:80 in {elapsed_time:.2f}ms"
)
else:
self.log_message(
f"Failed to connect to {target}:80 (seq={i+1})"
)
except Exception as e:
self.log_message(f"Connection error: {str(e)} (seq={i+1})")
# Display statistics
loss_percentage = (4 - success_count) * 25
self.log_message(f"\nConnection statistics for {target}:")
self.log_message(
f" Attempts: Sent = 4, Successful = {success_count}, Failed = {4 - success_count} ({loss_percentage}% loss)"
)
if success_count > 0:
self.log_message(f"Approximate connection times in milliseconds:")
self.log_message(
f" Minimum = {min(times):.2f}ms, Maximum = {max(times):.2f}ms, Average = {sum(times)/len(times):.2f}ms"
)
# Try to get hostname and MAC address
hostname = self.get_hostname(target)
mac_address = self.get_mac_address(target)
if hostname:
self.log_message(f"Hostname: {hostname}")
if mac_address:
self.log_message(f"MAC Address: {mac_address}")
except Exception as e:
self.log_message(f"Error executing ping: {str(e)}")
def start_scan(self):
"""Start network scanning between IP range"""
start_ip = self.scan_start_ip.get().strip()
end_ip = self.scan_end_ip.get().strip()
# Validate IPs
try:
start = ipaddress.IPv4Address(start_ip)
end = ipaddress.IPv4Address(end_ip)
if start > end:
self.show_error("Start IP must be lower than or equal to End IP")
return
# Clear previous results
for item in self.scan_results_tree.get_children():
self.scan_results_tree.delete(item)
self.scan_results = []
# Calculate total IPs to scan for progress bar
ip_range = int(end) - int(start) + 1
self.scan_progress_var.set(0)
self.scan_progress["maximum"] = ip_range
# Update nodes to scan count
self.nodes_to_scan.set(str(ip_range))
self.log_message(
f"Starting scan from {start_ip} to {end_ip} ({ip_range} addresses)..."
)
# Reset stop event and set running flag
self.scan_stop_event.clear()
self.scan_running = True
# Start scan in a separate thread
threading.Thread(
target=self._execute_scan, args=(start, end, ip_range), daemon=True
).start()
except ValueError as e:
self.show_error(f"Invalid IP address: {str(e)}")
def stop_scan(self):
"""Stop an ongoing network scan"""
if self.scan_running:
self.log_message("Stopping network scan...")
self.scan_stop_event.set()
# Wait for all scan threads to finish
for thread in self.scan_threads:
if thread.is_alive():
thread.join(0.5)
self.scan_running = False
self.log_message("Network scan stopped.")
def _execute_scan(self, start_ip, end_ip, ip_range):
"""Execute the network scan - now only performs ping scan"""
try:
# Empty the queue
while not self.scan_queue.empty():
self.scan_queue.get_nowait()
# Clear threads list
self.scan_threads = []
# Reset counter for finished IPs
self.scan_completed = 0
# Clear previous scan results tree
for item in self.scan_results_tree.get_children():
self.scan_results_tree.delete(item)
# Fill queue with all IPs in range
for ip_int in range(int(start_ip), int(end_ip) + 1):
self.scan_queue.put(ipaddress.IPv4Address(ip_int))
# Start worker threads for parallel scanning
max_threads = min(
20, ip_range
) # Limit to 20 threads or fewer if range is small
for i in range(max_threads):
thread = threading.Thread(target=self._scan_worker, daemon=True)
thread.start()
self.scan_threads.append(thread)
# Wait for threads to complete or be stopped
while self.scan_running and any(t.is_alive() for t in self.scan_threads):
time.sleep(0.1)
if not self.scan_stop_event.is_set():
self.log_message(
f"Scan completed. Found {len(self.scan_results)} active hosts. Use 'Get Host Info' to lookup hostnames and MAC addresses."
)
except Exception as e:
self.log_message(f"Error in network scan: {str(e)}")
finally:
self.scan_running = False
def _scan_worker(self):
"""Worker thread function to scan IPs from the queue - now only performs ping scan"""
while not self.scan_stop_event.is_set():
try:
# Get next IP from queue with timeout
try:
ip_address = self.scan_queue.get(timeout=0.1)
except queue.Empty:
# No more IPs to scan
break
# Ping the IP
is_active = self._ping_host(str(ip_address))
# If active, add to results - but don't get host info yet
if is_active:
ip_str = str(ip_address)
self.log_message(f"Host discovered: {ip_str}")
self.scan_results.append(ip_str)
# Add to results tree with placeholder values
self.master.after(
0, lambda ip=ip_str: self.add_scan_result(ip, "", "")
)
# Update progress
self.scan_completed = self.scan_completed + 1
self.scan_progress_var.set(self.scan_completed)
# Mark task as done
self.scan_queue.task_done()
except Exception as e:
self.log_message(f"Error in scan worker: {str(e)}")
def gather_host_information(self):
"""Gather hostname and MAC address information for discovered hosts"""
if not self.scan_results:
self.show_info("No hosts discovered yet. Run a scan first.")
return
# Start the information gathering in a separate thread
threading.Thread(target=self._execute_host_gathering, daemon=True).start()
def _execute_host_gathering(self):
"""Perform the actual host information gathering"""
try:
self.log_message(
f"Gathering information for {len(self.scan_results)} hosts..."
)
# Reset progress bar for this operation
total_hosts = len(self.scan_results)
self.scan_progress_var.set(0)
self.scan_progress["maximum"] = total_hosts
# Get information for each host
for i, ip in enumerate(self.scan_results):
if self.scan_stop_event.is_set():
self.log_message("Host information gathering stopped.")
break
self.log_message(
f"Getting information for host {ip} ({i+1}/{total_hosts})..."
)
# Get hostname and MAC
hostname = self.get_hostname(ip)
mac_address = self.get_mac_address(ip)
# Log what we found
if hostname:
self.log_message(f" Hostname: {hostname}")
else:
self.log_message(f" Hostname: Not resolved")
if mac_address:
self.log_message(f" MAC Address: {mac_address}")
else:
self.log_message(f" MAC Address: Not found")
# Update the tree
self.update_host_in_tree(ip, hostname, mac_address)
# Update progress
self.scan_progress_var.set(i + 1)
self.log_message("Host information gathering completed.")
except Exception as e:
self.log_message(f"Error gathering host information: {str(e)}")
def update_host_in_tree(self, ip, hostname, mac):
"""Update an existing host in the treeview with obtained information"""
try:
# Find the item with this IP
for item_id in self.scan_results_tree.get_children():
values = self.scan_results_tree.item(item_id, "values")
if values and values[0] == ip:
# Update the values
hostname_str = hostname if hostname else "Not resolved"
mac_str = mac if mac else "Not found"
self.scan_results_tree.item(
item_id, values=(ip, hostname_str, mac_str)
)
break
except Exception as e:
self.log_message(f"Error updating host in treeview: {str(e)}")
def add_scan_result(self, ip, hostname, mac):
"""Helper method to add scan result to the treeview"""
try:
# Insert the item with all values, even if some are empty
hostname = hostname if hostname else "Not resolved"
mac = mac if mac else "Not found"
self.scan_results_tree.insert("", "end", values=(ip, hostname, mac))
except Exception as e:
self.log_message(f"Error adding scan result to UI: {str(e)}")
def _ping_host(self, host: str) -> bool:
"""Ping a host to check if it's active, returns True if host responds"""
try:
if PING_LIBRARY_AVAILABLE:
# Use PythonPing with shorter timeout for scanning
response = pythonping(host, count=1, timeout=0.5)
return response.success()
else:
# Fallback using socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.5)
result = s.connect_ex((host, 80)) # Try common port
s.close()
return result == 0
except:
return False
def set_static_ip(self):
interface = self.if_var.get()
if not interface:
self.show_error("Please select a network interface")
return
# Guardar configuración actual antes de cambiarla
self.save_previous_config(interface)
# Validar IP
is_valid, result = self.validate_ip_input()
if not is_valid:
self.show_error(result)
return
ip = result
subnet_mask = self.subnet_mask.get()
# Guardar IP actual en el historial
self.add_to_history(self.ip_prefix.get())
# Log the actual command we'll be executing
gateway = f"{self.ip_prefix.get()}.1"
command = f'netsh interface ip set address "{interface}" static {ip} {subnet_mask} {gateway}'
self.log_message(f"Executing network command: {command}")
# Ejecutar script con privilegios - pass subnet mask
if self.execute_admin_script(
interface, ip, debug=True, subnet_mask=subnet_mask
):
time.sleep(2) # Esperar a que se apliquen los cambios
self.refresh_interfaces()
self.log_message(
f"Successfully set static IP {ip} with mask {subnet_mask} on {interface}"
)
else:
self.show_error("Failed to set static IP. Check the log for details.")
def set_dhcp(self):
interface = self.if_var.get()
if not interface:
self.show_error("Please select a network interface")
return
# Guardar configuración actual antes de cambiarla
self.save_previous_config(interface)
# Ejecutar script con privilegios
if self.execute_admin_script(interface, "dhcp", debug=True):
time.sleep(2) # Esperar a que se apliquen los cambios
self.refresh_interfaces()
self.log_message(f"Successfully enabled DHCP on {interface}")
else:
self.show_error("Failed to enable DHCP. Check the log for details.")
def run_command(self, cmd: str) -> bool:
self.log_message(f"Executing command: {cmd}")
try:
process = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
while True:
output = process.stdout.readline()
if output == "" and process.poll() is not None:
break
if output:
self.log_message(output.strip())
retcode = process.poll()
error_output = process.stderr.read()
if error_output:
self.log_message(f"Error output:\n{error_output}")
if retcode == 0:
self.log_message("Command executed successfully")
return True
else:
self.log_message(f"Command failed with return code: {retcode}")
return False
except Exception as e:
self.log_message(f"Error executing command: {str(e)}")
return False
def is_admin(self):
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except Exception as e:
self.log_message(f"Error checking admin privileges: {str(e)}")
return False
def elevate_privileges(self):
if sys.platform == "win32":
self.show_info(
"The application needs administrator privileges to change network settings. "
"Please confirm the UAC prompt."
)
script_path = os.path.abspath(sys.argv[0])
try:
ctypes.windll.shell32.ShellExecuteW(
None, "runas", sys.executable, f'"{script_path}"', None, 1
)
self.master.quit()
except Exception as e:
self.log_message(f"Error elevating privileges: {str(e)}")
self.show_error("Failed to elevate privileges")
def show_error(self, message: str):
self.log_message(f"ERROR: {message}")
messagebox.showerror("Error", message)
def show_info(self, message: str):
self.log_message(f"INFO: {message}")
messagebox.showinfo("Information", message)
# Add new methods for subnet mask conversion
def on_subnet_mask_changed(self, *args):
"""Update CIDR bits when subnet mask changes and update scan IP range"""
# Use a static variable to prevent recursion
if hasattr(self, "_updating_mask") and self._updating_mask:
return
try:
self._updating_mask = True
mask = self.subnet_mask.get()
if self.is_valid_subnet_mask(mask):
# Convert mask to bits
bits = self.subnet_mask_to_cidr(mask)
self.cidr_bits.set(str(bits))
# Update scan IP range if we have a current IP
current_ip = self.current_ip_var.get()
if current_ip:
self.update_scan_ip_range(current_ip, mask)
except Exception as e:
self.log_message(f"Error updating CIDR bits: {str(e)}")
finally:
self._updating_mask = False
def on_cidr_bits_changed(self, *args):
"""Update subnet mask when CIDR bits change and update scan IP range"""
# Use a static variable to prevent recursion
if hasattr(self, "_updating_bits") and self._updating_bits:
return
try:
self._updating_bits = True
bits_str = self.cidr_bits.get()
if bits_str.isdigit():
bits = int(bits_str)
if 0 <= bits <= 32:
# Convert bits to mask
mask = self.cidr_to_subnet_mask(bits)
self.subnet_mask.set(mask)
# Update scan IP range if we have a current IP
current_ip = self.current_ip_var.get()
if current_ip:
self.update_scan_ip_range(current_ip, mask)
except Exception as e:
self.log_message(f"Error updating subnet mask: {str(e)}")
finally:
self._updating_bits = False
def is_valid_subnet_mask(self, mask: str) -> bool:
"""Validate if the string is a valid subnet mask"""
try:
# Check if it has the format x.x.x.x
parts = mask.split(".")
if len(parts) != 4:
return False
# Each part should be a number between 0-255
for part in parts:
if not part.isdigit() or not 0 <= int(part) <= 255:
return False
# Check if it's a valid subnet mask pattern
# Convert to binary and ensure it's a continuous sequence of 1s followed by 0s
binary = "".join([bin(int(octet))[2:].zfill(8) for octet in parts])
if "01" in binary: # If there's a 0 followed by 1, it's not valid
return False
return True
except Exception:
return False
def subnet_mask_to_cidr(self, mask: str) -> int:
"""Convert subnet mask to CIDR notation bits"""
try:
# Convert each octet to binary and count the number of 1s
parts = mask.split(".")
binary = "".join([bin(int(octet))[2:].zfill(8) for octet in parts])
return binary.count("1")
except Exception as e:
self.log_message(f"Error converting subnet mask to CIDR: {str(e)}")
return 24 # Default to /24
def cidr_to_subnet_mask(self, bits: int) -> str:
"""Convert CIDR bits to subnet mask in dotted decimal notation"""
try:
# Create a binary string with the specified number of 1s followed by 0s
binary = "1" * bits + "0" * (32 - bits)
# Split the binary string into 4 octets and convert each to decimal
octets = [binary[i : i + 8] for i in range(0, 32, 8)]
decimals = [str(int(octet, 2)) for octet in octets]
return ".".join(decimals)
except Exception as e:
self.log_message(f"Error converting CIDR to subnet mask: {str(e)}")
return "255.255.255.0" # Default to 255.255.255.0
def update_ping_target(self, ip_prefix):
"""Update the ping target field based on the selected IP prefix"""
if ip_prefix:
target = self.config.get_ping_target(ip_prefix)
self.ping_target.set(target)
def save_current_ping_target(self, event=None):
"""Save the current ping target for the current IP prefix"""
ip_prefix = self.ip_prefix.get()
target = self.ping_target.get()
if ip_prefix and target:
self.config.set_ping_target(ip_prefix, target)
self.log_message(f"Saved ping target {target} for IP prefix {ip_prefix}")
def copy_to_clipboard(self, value):
"""Copy the given value to clipboard"""
self.master.clipboard_clear()
self.master.clipboard_append(value)
self.log_message(f"Copied to clipboard: {value}")
def copy_current_ip(self):
"""Copy the current IP to clipboard"""
ip = self.current_ip_var.get()
if ip:
self.copy_to_clipboard(ip)
else:
self.show_info("No IP address available to copy")
def copy_current_gateway(self):
"""Copy the current gateway to clipboard"""
gateway = self.current_gateway_var.get()
if gateway:
self.copy_to_clipboard(gateway)
else:
self.show_info("No gateway address available to copy")
def update_scan_ip_range(self, ip_address, subnet_mask):
"""Calculate and update scan IP range based on current IP and subnet mask"""
try:
# Calculate network address and broadcast address
ip_obj = ipaddress.IPv4Address(ip_address)
mask_obj = ipaddress.IPv4Address(subnet_mask)
# Convert mask to binary string
mask_bits = bin(int(mask_obj))[2:].zfill(32)
network_bits = mask_bits.count("1")
# Update the scan CIDR field
self.scan_cidr_bits.set(str(network_bits))
# Create network object
network = ipaddress.IPv4Network(
f"{ip_address}/{network_bits}", strict=False
)
# Get first and last usable host addresses
if network.num_addresses <= 2:
# For point-to-point links or /31 masks
start_ip = network.network_address
end_ip = network.broadcast_address
else:
# Skip network address and broadcast address
start_ip = network.network_address + 1
end_ip = network.broadcast_address - 1
# Update the scan fields
self.scan_start_ip.set(str(start_ip))
self.scan_end_ip.set(str(end_ip))
# Update the nodes to scan count
nodes_count = int(end_ip) - int(start_ip) + 1
self.nodes_to_scan.set(str(nodes_count))
self.log_message(
f"Updated scan range: {start_ip} - {end_ip} ({nodes_count} nodes)"
)
except Exception as e:
self.log_message(f"Error calculating IP range: {str(e)}")
def update_scan_range_from_cidr(self, *args):
"""Update scan IP range when CIDR value changes - only modify End IP"""
try:
start_ip = self.scan_start_ip.get().strip()
cidr_bits = self.scan_cidr_bits.get().strip()
if not start_ip or not cidr_bits.isdigit():
return
# Convert CIDR to network
bits = int(cidr_bits)
if 0 <= bits <= 32:
# Use the start IP as the base for the network
network = ipaddress.IPv4Network(f"{start_ip}/{bits}", strict=False)
# Keep the original start IP and just update the end IP
# For the end IP, use the broadcast address (or last usable address)
if network.num_addresses <= 2:
end_ip = network.broadcast_address
else:
end_ip = network.broadcast_address - 1
# Update only the end IP field
self.scan_end_ip.set(str(end_ip))
# Update the nodes to scan count
start = ipaddress.IPv4Address(start_ip)
nodes_count = int(end_ip) - int(start) + 1
self.nodes_to_scan.set(str(nodes_count))
self.log_message(
f"Updated scan range from CIDR: {start_ip} - {end_ip} ({nodes_count} nodes)"
)
except Exception as e:
self.log_message(f"Error updating scan range from CIDR: {str(e)}")
def get_hostname(self, ip_address):
"""Try to resolve hostname for an IP address with better error handling"""
try:
self.log_message(f"Resolving hostname for {ip_address}...")
hostname, _, _ = socket.gethostbyaddr(ip_address)
return hostname
except socket.herror as e:
self.log_message(f"Hostname resolution error for {ip_address}: {e}")
return ""
except socket.gaierror as e:
self.log_message(f"Address-related error for {ip_address}: {e}")
return ""
except socket.timeout:
self.log_message(f"Timeout resolving hostname for {ip_address}")
return ""
except Exception as e:
self.log_message(
f"Unknown error resolving hostname for {ip_address}: {str(e)}"
)
return ""
def get_mac_address(self, ip_address):
"""Get MAC address for an IP using ARP on Windows with improved parsing"""
try:
# Run the ARP command with timeout to avoid hanging
result = subprocess.run(
f"arp -a {ip_address}",
shell=True,
capture_output=True,
text=True,
timeout=2,
)
if result.returncode == 0 and result.stdout:
# Log the raw output for debugging
self.log_message(
f"ARP output for {ip_address}: {result.stdout.strip()}"
)
# Parse the output to find the MAC address
# First try the typical format with hyphens or colons
mac_match = re.search(
r"([0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2})",
result.stdout,
re.IGNORECASE,
)
if mac_match:
return mac_match.group(0)
# Try an alternative format with spaces (Windows format)
mac_match = re.search(
r"([0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2})",
result.stdout,
re.IGNORECASE,
)
if mac_match:
return mac_match.group(0)
# Try yet another approach - look for any string of hex digits with separators
lines = result.stdout.strip().split("\n")
for line in lines:
if ip_address in line:
parts = line.split()
# Usually the MAC address is the second column in the output
if len(parts) >= 2:
# Check if the second part looks like a MAC address
if re.match(
r"([0-9a-f]{2}[^\w]){5}[0-9a-f]{2}",
parts[1],
re.IGNORECASE,
):
return parts[1]
self.log_message(f"No MAC address found for {ip_address}")
return ""
except subprocess.TimeoutExpired:
self.log_message(f"Timeout running ARP command for {ip_address}")
return ""
except Exception as e:
self.log_message(f"Error getting MAC address for {ip_address}: {str(e)}")
return ""
def main():
root = tk.Tk()
app = IPChangerApp(root)
root.mainloop()
if __name__ == "__main__":
main()