IPchangeNG/menu-ip-change.py

2028 lines
78 KiB
Python

import tkinter as tk
from tkinter import ttk, messagebox
import json
import os
import subprocess
import re
import ctypes
import sys
import ipaddress
from typing import List, Optional, Dict
from dataclasses import dataclass
import threading
import time
import socket
import queue
import struct
# Add libraries for ping and SNMP functionality
try:
from pythonping import ping as pythonping
PING_LIBRARY_AVAILABLE = True
except ImportError:
PING_LIBRARY_AVAILABLE = False
print(
"PythonPing module not found. Install with 'pip install pythonping' for better ping functionality."
)
try:
from pysnmp.hlapi import *
SNMP_AVAILABLE = True
except ImportError:
SNMP_AVAILABLE = False
print(
"pysnmp module not found. Install with 'pip install pysnmp' for SNMP functionality."
)
try:
import wmi
WMI_AVAILABLE = True
except ImportError:
WMI_AVAILABLE = False
print(
"WMI module not found. Using alternative method for network interface detection."
)
# Add MAC vendor lookup library
try:
from mac_vendor_lookup import MacLookup
MAC_LOOKUP_AVAILABLE = True
# Initialize the MAC lookup service
mac_lookup = MacLookup()
# Update the database if needed (this can be time-consuming)
# mac_lookup.update_vendors() # Uncomment to update database on startup
except ImportError:
MAC_LOOKUP_AVAILABLE = False
print(
"mac-vendor-lookup module not found. Install with 'pip install mac-vendor-lookup' for MAC vendor lookup functionality."
)
@dataclass
class NetworkInterface:
name: str
description: str
ip_address: str = ""
subnet_mask: str = ""
gateway: str = ""
dhcp_enabled: bool = True
active: bool = True
adapter_id: str = ""
class IPChangeConfig:
def __init__(self):
self.config_file = "ip_config.json"
self.history_file = "ip_history.json"
self.ping_targets_file = "ping_targets.json" # New file for ping targets
self.load_config()
self.load_ping_targets() # Load ping targets
def load_config(self):
try:
if os.path.exists(self.config_file):
with open(self.config_file, "r") as f:
config = json.load(f)
self.last_interface = config.get("last_interface", "")
self.last_ip_prefix = config.get("last_ip_prefix", "")
self.previous_config = config.get("previous_config", {})
else:
self.last_interface = ""
self.last_ip_prefix = ""
self.previous_config = {}
except Exception as e:
print(f"Error loading config: {e}")
self.last_interface = ""
self.last_ip_prefix = ""
self.previous_config = {}
def save_config(self):
try:
config = {
"last_interface": self.last_interface,
"last_ip_prefix": self.last_ip_prefix,
"previous_config": self.previous_config,
}
with open(self.config_file, "w") as f:
json.dump(config, f)
except Exception as e:
print(f"Error saving config: {e}")
def load_ping_targets(self):
"""Load saved ping targets for each IP prefix"""
try:
if os.path.exists(self.ping_targets_file):
with open(self.ping_targets_file, "r") as f:
self.ping_targets = json.load(f)
else:
self.ping_targets = {} # Dictionary to store ping targets by IP prefix
except Exception as e:
print(f"Error loading ping targets: {e}")
self.ping_targets = {}
def save_ping_targets(self):
"""Save ping targets for each IP prefix"""
try:
with open(self.ping_targets_file, "w") as f:
json.dump(self.ping_targets, f)
except Exception as e:
print(f"Error saving ping targets: {e}")
def get_ping_target(self, ip_prefix):
"""Get the saved ping target for the given IP prefix"""
if not ip_prefix:
return ""
# Try to find exact match first
if ip_prefix in self.ping_targets:
return self.ping_targets[ip_prefix]
# If no exact match, try to find a matching IP prefix
for prefix, target in self.ping_targets.items():
if ip_prefix.startswith(prefix) or prefix.startswith(ip_prefix):
return target
# Default target: IP prefix + ".1" (usually the gateway)
return f"{ip_prefix}.1"
def set_ping_target(self, ip_prefix, target):
"""Save a ping target for the given IP prefix"""
if ip_prefix and target:
self.ping_targets[ip_prefix] = target
self.save_ping_targets()
class IPChangerApp:
def __init__(self, master):
self.master = master
self.master.title("Network Interface IP Configuration")
self.master.geometry("900x700")
# Inicializar configuración antes que nada
self.config = IPChangeConfig()
# Inicializar estructuras de datos
self.interfaces: List[NetworkInterface] = []
self.ip_history: List[str] = []
# Variables for continuous ping
self.continuous_ping = tk.BooleanVar(value=False)
self.ping_running = False
self.ping_stop_event = threading.Event()
self.last_ping_time = tk.StringVar(
value="N/A"
) # Add variable for last ping time
# Variables for IP scanning
self.scan_results = []
self.scan_running = False
self.scan_stop_event = threading.Event()
self.scan_queue = queue.Queue()
self.scan_threads = []
self.scan_progress_var = tk.IntVar(value=0)
# Inicializar WMI si está disponible
if WMI_AVAILABLE:
self.wmi_client = wmi.WMI()
# Cargar datos guardados antes de crear widgets
self.load_ip_history()
# Crear la interfaz con pestañas
self.create_widgets()
# Initialize trace variables
self.subnet_trace_id = None
self.cidr_trace_id = None
# Set up initial traces safely
self.setup_mask_traces()
# Actualizar la lista de IPs en el combo
self.update_history_display()
# Refrescar interfaces
self.refresh_interfaces()
# Configurar pesos de la cuadrícula
self.master.grid_columnconfigure(0, weight=1)
self.master.grid_rowconfigure(1, weight=1) # El log expandible
# Initialize MAC lookup if available
if MAC_LOOKUP_AVAILABLE:
self.mac_lookup = MacLookup()
def setup_mask_traces(self):
"""Set up traces for subnet mask and CIDR fields safely"""
# First initialize variables
self.subnet_mask.set(self.subnet_mask.get()) # Ensure current value is valid
self.cidr_bits.set(self.cidr_bits.get()) # Ensure current value is valid
# Add traces using a different approach - using add not trace_add
self.subnet_mask.trace("w", self.on_subnet_mask_changed)
self.cidr_bits.trace("w", self.on_cidr_bits_changed)
self.log_message("Subnet mask traces initialized")
def remove_mask_traces(self):
"""Not needed - we'll use a different approach"""
pass # Intentionally empty
def get_ip_prefix(self, ip: str) -> str:
"""Extrae los primeros 3 octetos de una dirección IP"""
parts = ip.split(".")
if len(parts) >= 3:
return ".".join(parts[:3])
return ip
def create_widgets(self):
# Create a notebook (tabbed interface)
self.notebook = ttk.Notebook(self.master)
self.notebook.grid(row=0, column=0, sticky="nsew", padx=10, pady=10)
# Create frames for each tab
self.ip_setup_tab = ttk.Frame(self.notebook, padding="10")
self.tools_tab = ttk.Frame(self.notebook, padding="10")
# Configure tab frames to expand
self.ip_setup_tab.columnconfigure(0, weight=1)
self.tools_tab.columnconfigure(0, weight=1)
# Add tabs to the notebook
self.notebook.add(self.ip_setup_tab, text="IP Setup")
self.notebook.add(self.tools_tab, text="Tools")
# Create IP Setup tab content (moved from original layout)
self.create_ip_setup_tab()
# Create Tools tab content
self.create_tools_tab()
# Log frame - shared between tabs
self.log_frame = ttk.LabelFrame(self.master, text="Operation Log", padding="5")
self.log_frame.grid(row=1, column=0, sticky="nsew", padx=10, pady=(0, 10))
# Use a better font and enable tag configuration
log_font = (
"Consolas",
10,
) # Use a monospaced font like Consolas or Courier New
self.log_text = tk.Text(self.log_frame, wrap="none", height=10, font=log_font)
# Create tags for different message types with specific formatting
self.log_text.tag_configure("INFO", foreground="blue")
self.log_text.tag_configure(
"ERROR", foreground="red", font=(log_font[0], log_font[1], "bold")
)
self.log_text.tag_configure(
"SUCCESS", foreground="green", font=(log_font[0], log_font[1], "bold")
)
self.log_text.tag_configure("WARN", foreground="orange")
self.log_text.tag_configure("PING", foreground="purple")
self.log_text.tag_configure(
"DISCOVERY", foreground="navy", font=(log_font[0], log_font[1], "bold")
)
self.log_text.tag_configure("HOSTNAME", foreground="teal")
self.log_text.tag_configure("MAC", foreground="maroon")
self.log_text.tag_configure("COMMAND", foreground="sienna")
self.log_text.tag_configure("VENDOR", foreground="#8B008B") # Dark magenta
# Continue with existing scrollbar setup
self.log_scrollbar_y = ttk.Scrollbar(
self.log_frame, orient="vertical", command=self.log_text.yview
)
self.log_scrollbar_x = ttk.Scrollbar(
self.log_frame, orient="horizontal", command=self.log_text.xview
)
self.log_text.configure(
yscrollcommand=self.log_scrollbar_y.set,
xscrollcommand=self.log_scrollbar_x.set,
)
self.log_text.grid(row=0, column=0, sticky="nsew")
self.log_scrollbar_y.grid(row=0, column=1, sticky="ns")
self.log_scrollbar_x.grid(row=1, column=0, sticky="ew")
ttk.Button(self.log_frame, text="Clear Log", command=self.clear_log).grid(
row=2, column=0, columnspan=2, pady=5
)
self.log_frame.grid_columnconfigure(0, weight=1)
self.log_frame.grid_rowconfigure(0, weight=1)
def create_ip_setup_tab(self):
# Control frame for IP Setup tab
self.control_frame = ttk.Frame(self.ip_setup_tab)
self.control_frame.grid(row=0, column=0, sticky="nsew")
self.control_frame.columnconfigure(0, weight=1)
# Sección de interfaces
self.interfaces_frame = ttk.LabelFrame(
self.control_frame, text="Network Interfaces", padding="5"
)
self.interfaces_frame.grid(row=0, column=0, sticky="ew", pady=(0, 10))
self.interfaces_frame.columnconfigure(
1, weight=1
) # Make combo box column expandable
ttk.Label(self.interfaces_frame, text="Select Interface:").grid(
row=0, column=0, sticky="w", padx=5
)
self.if_var = tk.StringVar()
self.if_combo = ttk.Combobox(
self.interfaces_frame, textvariable=self.if_var, state="readonly", width=50
)
self.if_combo.grid(row=0, column=1, sticky="ew", padx=5)
self.if_combo.bind("<<ComboboxSelected>>", self.on_interface_selected)
# Info de configuración actual
self.current_frame = ttk.LabelFrame(
self.control_frame, text="Current Configuration", padding="5"
)
self.current_frame.grid(row=1, column=0, sticky="ew", pady=(0, 10))
self.current_frame.columnconfigure(1, weight=1) # Make value column expandable
self.current_ip_var = tk.StringVar()
self.current_mask_var = tk.StringVar()
self.current_gateway_var = tk.StringVar()
self.dhcp_status_var = tk.StringVar()
# Current IP and copy button - aligned left
ttk.Label(self.current_frame, text="Current IP:").grid(
row=0, column=0, sticky="w", padx=5
)
ip_frame = ttk.Frame(self.current_frame)
ip_frame.grid(row=0, column=1, sticky="w")
ttk.Label(ip_frame, textvariable=self.current_ip_var).pack(side=tk.LEFT)
ttk.Button(ip_frame, text="Copy", width=5, command=self.copy_current_ip).pack(
side=tk.LEFT, padx=5
)
ttk.Label(self.current_frame, text="Subnet Mask:").grid(
row=1, column=0, sticky="w", padx=5
)
ttk.Label(self.current_frame, textvariable=self.current_mask_var).grid(
row=1, column=1, sticky="w"
)
# Gateway and copy button - aligned left
ttk.Label(self.current_frame, text="Gateway:").grid(
row=2, column=0, sticky="w", padx=5
)
gateway_frame = ttk.Frame(self.current_frame)
gateway_frame.grid(row=2, column=1, sticky="w")
ttk.Label(gateway_frame, textvariable=self.current_gateway_var).pack(
side=tk.LEFT
)
ttk.Button(
gateway_frame, text="Copy", width=5, command=self.copy_current_gateway
).pack(side=tk.LEFT, padx=5)
ttk.Label(self.current_frame, text="DHCP Status:").grid(
row=3, column=0, sticky="w", padx=5
)
ttk.Label(self.current_frame, textvariable=self.dhcp_status_var).grid(
row=3, column=1, sticky="w"
)
# Sección de configuración IP
self.ip_frame = ttk.LabelFrame(
self.control_frame, text="IP Configuration", padding="5"
)
self.ip_frame.grid(row=2, column=0, sticky="ew", pady=(0, 10))
self.ip_frame.columnconfigure(1, weight=1) # Make IP entry column expandable
# IP Prefix row - all aligned left
ttk.Label(self.ip_frame, text="IP Prefix (first 3 octets):").grid(
row=0, column=0, sticky="w", padx=5
)
ip_prefix_frame = ttk.Frame(self.ip_frame)
ip_prefix_frame.grid(row=0, column=1, sticky="w", padx=5)
self.ip_prefix = tk.StringVar(value=self.config.last_ip_prefix)
self.ip_entry = ttk.Entry(
ip_prefix_frame, textvariable=self.ip_prefix, width=30
)
self.ip_entry.pack(side=tk.LEFT)
ttk.Label(ip_prefix_frame, text="Last Octet:").pack(side=tk.LEFT, padx=5)
self.last_octet = tk.StringVar(value="249")
self.last_octet_entry = ttk.Entry(
ip_prefix_frame, textvariable=self.last_octet, width=5
)
self.last_octet_entry.pack(side=tk.LEFT)
# Subnet mask row - all aligned left
ttk.Label(self.ip_frame, text="Subnet Mask:").grid(
row=1, column=0, sticky="w", padx=5
)
subnet_frame = ttk.Frame(self.ip_frame)
subnet_frame.grid(row=1, column=1, sticky="w", padx=5)
self.subnet_mask = tk.StringVar(value="255.255.255.0")
self.subnet_entry = ttk.Entry(
subnet_frame, textvariable=self.subnet_mask, width=15
)
self.subnet_entry.pack(side=tk.LEFT)
ttk.Label(subnet_frame, text="CIDR (/bits):").pack(side=tk.LEFT, padx=5)
self.cidr_bits = tk.StringVar(value="24")
self.cidr_entry = ttk.Entry(subnet_frame, textvariable=self.cidr_bits, width=5)
self.cidr_entry.pack(side=tk.LEFT)
# Sección de historial
self.history_frame = ttk.LabelFrame(
self.control_frame, text="IP History", padding="5"
)
self.history_frame.grid(row=3, column=0, sticky="ew", pady=(0, 10))
self.history_frame.columnconfigure(1, weight=1) # Make combo column expandable
ttk.Label(self.history_frame, text="Previous IPs:").grid(
row=0, column=0, sticky="w", padx=5
)
self.history_var = tk.StringVar()
self.history_combo = ttk.Combobox(
self.history_frame,
textvariable=self.history_var,
state="readonly",
width=50,
)
self.history_combo.grid(row=0, column=1, sticky="ew", padx=5)
self.history_combo.bind("<<ComboboxSelected>>", self.on_history_selected)
# Botones de acción
self.button_frame = ttk.Frame(self.control_frame)
self.button_frame.grid(row=4, column=0, sticky="ew", pady=(0, 10))
# Make the button frame distribute buttons evenly
for i in range(4): # For the 4 buttons
self.button_frame.columnconfigure(i, weight=1)
ttk.Button(
self.button_frame, text="Set Static IP", command=self.set_static_ip
).grid(row=0, column=0, padx=5)
ttk.Button(self.button_frame, text="Enable DHCP", command=self.set_dhcp).grid(
row=0, column=1, padx=5
)
ttk.Button(
self.button_frame, text="Restore Previous", command=self.restore_previous
).grid(row=0, column=2, padx=5)
ttk.Button(
self.button_frame,
text="Refresh Interfaces",
command=self.refresh_interfaces,
).grid(row=0, column=3, padx=5)
def create_tools_tab(self):
# Control frame for Tools tab
self.tools_control_frame = ttk.Frame(self.tools_tab)
self.tools_control_frame.grid(row=0, column=0, sticky="nsew")
self.tools_control_frame.columnconfigure(0, weight=1)
self.tools_control_frame.rowconfigure(
1, weight=1
) # Make the scan frame expandable
# Network Tools section (ping tool)
self.ping_frame = ttk.LabelFrame(
self.tools_control_frame, text="Ping Tool", padding="5"
)
self.ping_frame.grid(row=0, column=0, sticky="ew", pady=(0, 10))
self.ping_frame.columnconfigure(
1, weight=1
) # Make ping target field expandable
ttk.Label(self.ping_frame, text="Target IP/Host:").grid(
row=0, column=0, sticky="w", padx=5
)
self.ping_target = tk.StringVar()
self.ping_entry = ttk.Entry(
self.ping_frame, textvariable=self.ping_target, width=40
)
self.ping_entry.grid(row=0, column=1, sticky="w", padx=5)
# Bind the FocusOut event to save the ping target
self.ping_entry.bind("<FocusOut>", self.save_current_ping_target)
# Add last ping time display field
ttk.Label(self.ping_frame, text="Last RTT:").grid(
row=0, column=2, sticky="w", padx=5
)
self.last_ping_time = tk.StringVar(value="N/A")
ping_time_display = ttk.Entry(
self.ping_frame, textvariable=self.last_ping_time, width=8, state="readonly"
)
ping_time_display.grid(row=0, column=3, padx=5)
# Add continuous ping checkbox
self.continuous_ping_check = ttk.Checkbutton(
self.ping_frame, text="Continuous Ping", variable=self.continuous_ping
)
self.continuous_ping_check.grid(row=0, column=4, padx=5)
ttk.Button(self.ping_frame, text="Ping", command=self.do_ping).grid(
row=0, column=5, padx=5
)
ttk.Button(self.ping_frame, text="Stop", command=self.stop_ping).grid(
row=0, column=6, padx=5
)
# Network Scan section - make it expand both horizontally and vertically
self.scan_frame = ttk.LabelFrame(
self.tools_control_frame, text="Network Scan", padding="5"
)
self.scan_frame.grid(row=1, column=0, sticky="nsew", pady=(0, 10))
# Configure column weights for scan frame to make it expandable
self.scan_frame.columnconfigure(0, weight=1) # Single column for frames
self.scan_frame.rowconfigure(4, weight=1) # Make results row expandable
# Use frames to organize elements in a row and aligned left
# Start IP and End IP row
ip_range_frame = ttk.Frame(self.scan_frame)
ip_range_frame.grid(row=0, column=0, sticky="w", padx=5, pady=2)
ttk.Label(ip_range_frame, text="Start IP:").pack(side=tk.LEFT, padx=(0, 5))
self.scan_start_ip = tk.StringVar()
self.scan_start_entry = ttk.Entry(
ip_range_frame, textvariable=self.scan_start_ip, width=15
)
self.scan_start_entry.pack(side=tk.LEFT)
ttk.Label(ip_range_frame, text="End IP:").pack(side=tk.LEFT, padx=(10, 5))
self.scan_end_ip = tk.StringVar()
self.scan_end_entry = ttk.Entry(
ip_range_frame, textvariable=self.scan_end_ip, width=15
)
self.scan_end_entry.pack(side=tk.LEFT)
ttk.Label(ip_range_frame, text="CIDR (/bits):").pack(side=tk.LEFT, padx=(10, 5))
self.scan_cidr_bits = tk.StringVar(value="24")
self.scan_cidr_entry = ttk.Entry(
ip_range_frame, textvariable=self.scan_cidr_bits, width=5
)
self.scan_cidr_entry.pack(side=tk.LEFT)
# Add trace to update the scan range when CIDR changes
self.scan_cidr_bits.trace("w", self.update_scan_range_from_cidr)
# Nodes to scan and buttons row
controls_frame = ttk.Frame(self.scan_frame)
controls_frame.grid(row=1, column=0, sticky="ew", padx=5, pady=2)
controls_frame.columnconfigure(
1, weight=1
) # Make space between nodes and buttons expandable
nodes_frame = ttk.Frame(controls_frame)
nodes_frame.grid(row=0, column=0, sticky="w")
ttk.Label(nodes_frame, text="Nodes to scan:").pack(side=tk.LEFT, padx=(0, 5))
self.nodes_to_scan = tk.StringVar(value="0")
nodes_display = ttk.Entry(
nodes_frame, textvariable=self.nodes_to_scan, width=10, state="readonly"
)
nodes_display.pack(side=tk.LEFT)
# Scan buttons - aligned right
self.scan_buttons_frame = ttk.Frame(controls_frame)
self.scan_buttons_frame.grid(row=0, column=1, sticky="e")
ttk.Button(
self.scan_buttons_frame, text="Start Scan", command=self.start_scan
).pack(side=tk.LEFT, padx=5)
ttk.Button(
self.scan_buttons_frame, text="Stop Scan", command=self.stop_scan
).pack(side=tk.LEFT, padx=5)
ttk.Button(
self.scan_buttons_frame,
text="Get Host Info",
command=self.gather_host_information,
).pack(side=tk.LEFT, padx=5)
# Scan progress - keep full width and make it expand horizontally
self.scan_progress = ttk.Progressbar(
self.scan_frame,
orient="horizontal",
length=300,
mode="determinate",
variable=self.scan_progress_var,
)
self.scan_progress.grid(row=2, column=0, sticky="ew", padx=5, pady=5)
# Scan results - keep full width
ttk.Label(self.scan_frame, text="Scan Results:").grid(
row=3, column=0, sticky="w", padx=5
)
# Frame for results list and scrollbar - make it fully expandable
self.results_frame = ttk.Frame(self.scan_frame)
self.results_frame.grid(row=4, column=0, sticky="nsew", padx=5, pady=5)
self.results_frame.columnconfigure(0, weight=1)
self.results_frame.rowconfigure(0, weight=1)
# Replace Listbox with Treeview that has columns - make sure it expands
self.scan_results_tree = ttk.Treeview(
self.results_frame,
columns=("ip", "delay", "hostname", "mac", "vendor"), # Added delay column
show="headings",
height=10,
)
# Define columns
self.scan_results_tree.heading("ip", text="IP Address")
self.scan_results_tree.heading(
"delay", text="Ping (ms)"
) # New column for ping delay
self.scan_results_tree.heading("hostname", text="Hostname")
self.scan_results_tree.heading("mac", text="MAC Address")
self.scan_results_tree.heading("vendor", text="MAC Vendor")
# Set column widths with adjusted proportions
total_width = 650 # Base total width for the columns
self.scan_results_tree.column(
"ip", width=int(total_width * 0.15), anchor="w", stretch=True
)
self.scan_results_tree.column(
"delay", width=int(total_width * 0.08), anchor="center", stretch=True
)
self.scan_results_tree.column(
"hostname", width=int(total_width * 0.27), anchor="w", stretch=True
)
self.scan_results_tree.column(
"mac", width=int(total_width * 0.20), anchor="w", stretch=True
)
self.scan_results_tree.column(
"vendor", width=int(total_width * 0.30), anchor="w", stretch=True
)
# Add scrollbar
self.scan_results_scrollbar = ttk.Scrollbar(
self.results_frame, orient="vertical", command=self.scan_results_tree.yview
)
self.scan_results_tree.configure(yscrollcommand=self.scan_results_scrollbar.set)
# Make sure the treeview fills the available space
self.scan_results_tree.grid(row=0, column=0, sticky="nsew")
self.scan_results_scrollbar.grid(row=0, column=1, sticky="ns")
def clear_log(self):
self.log_text.delete("1.0", tk.END)
def log_message(self, message: str, tag: str = None):
"""Enhanced log function with tags for color and formatting"""
# Try to automatically detect message type if tag not specified
if tag is None:
if "ERROR:" in message or "Error" in message or "failed" in message.lower():
tag = "ERROR"
elif (
"SUCCESS" in message
or "Successfully" in message
or "completed" in message
):
tag = "SUCCESS"
elif "WARNING" in message or "Warn" in message:
tag = "WARN"
elif "Host discovered:" in message:
tag = "DISCOVERY"
elif (
"Reply from" in message
or "ping" in message.lower()
or "Request timed out" in message
):
tag = "PING"
elif "Hostname:" in message:
tag = "HOSTNAME"
elif "MAC Address:" in message:
tag = "MAC"
elif "MAC Vendor:" in message or "Vendor:" in message:
tag = "VENDOR"
elif "Executing" in message and "command" in message.lower():
tag = "COMMAND"
elif "INFO:" in message:
tag = "INFO"
else:
tag = None
# Insert the timestamp and message
timestamp = time.strftime("%H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{timestamp}] ", "timestamp")
# Insert the message with appropriate tag
if tag:
self.log_text.insert(tk.END, f"{message}\n", tag)
else:
self.log_text.insert(tk.END, f"{message}\n")
# Scroll to see the latest message
self.log_text.see(tk.END)
self.log_text.update_idletasks()
def load_ip_history(self):
try:
if os.path.exists(self.config.history_file):
with open(self.config.history_file, "r") as f:
self.ip_history = json.load(f)
else:
self.ip_history = []
except Exception as e:
self.log_message(f"Error loading IP history: {e}")
self.ip_history = []
def save_ip_history(self):
try:
with open(self.config.history_file, "w") as f:
json.dump(self.ip_history, f)
except Exception as e:
self.log_message(f"Error saving IP history: {e}")
def add_to_history(self, ip_prefix: str):
if ip_prefix and ip_prefix not in self.ip_history:
self.ip_history.insert(0, ip_prefix)
self.ip_history = self.ip_history[:15] # Mantener solo las últimas 15 IPs
self.save_ip_history()
self.update_history_display()
def update_history_display(self):
self.history_combo["values"] = self.ip_history
if self.ip_history:
self.history_combo.set(self.ip_history[0])
def on_history_selected(self, event):
selected_ip = self.history_var.get()
if selected_ip:
# Ensure we only get the first three octets
ip_parts = selected_ip.split(".")
if len(ip_parts) >= 3:
ip_prefix = ".".join(ip_parts[:3])
self.ip_prefix.set(ip_prefix)
self.config.last_ip_prefix = ip_prefix
self.config.save_config()
self.log_message(f"Selected IP prefix from history: {ip_prefix}")
# Update ping target for the new IP prefix
self.update_ping_target(ip_prefix)
def refresh_interfaces(self):
self.interfaces.clear()
try:
if WMI_AVAILABLE:
self._refresh_interfaces_wmi()
else:
self._refresh_interfaces_netsh()
# Actualizar combobox
self.if_combo["values"] = [
inf.name for inf in self.interfaces if inf.active
]
# Seleccionar última interfaz usada si está disponible
if self.config.last_interface in self.if_combo["values"]:
self.if_combo.set(self.config.last_interface)
self.on_interface_selected()
self.log_message("Interfaces refreshed successfully")
except Exception as e:
self.log_message(f"Error refreshing interfaces: {str(e)}")
self.show_error(f"Error refreshing interfaces: {str(e)}")
def _refresh_interfaces_wmi(self):
try:
adapters = self.wmi_client.Win32_NetworkAdapter(PhysicalAdapter=True)
configs = self.wmi_client.Win32_NetworkAdapterConfiguration()
for adapter in adapters:
config = next(
(
cfg
for cfg in configs
if cfg.InterfaceIndex == adapter.InterfaceIndex
),
None,
)
if config and config.IPEnabled:
interface = NetworkInterface(
name=adapter.NetConnectionID,
description=adapter.Description,
ip_address=config.IPAddress[0] if config.IPAddress else "",
subnet_mask=config.IPSubnet[0] if config.IPSubnet else "",
gateway=(
config.DefaultIPGateway[0]
if config.DefaultIPGateway
else ""
),
dhcp_enabled=config.DHCPEnabled,
active=adapter.NetEnabled,
adapter_id=adapter.GUID,
)
self.interfaces.append(interface)
except Exception as e:
raise Exception(f"WMI refresh error: {str(e)}")
def _refresh_interfaces_netsh(self):
try:
output = subprocess.check_output(
"netsh interface ipv4 show config", shell=True, text=True
)
configs = output.split("\n\n")
for config in configs:
if not config.strip():
continue
name_match = re.search(r"Configuración de\s+\"(.+?)\"", config)
if not name_match:
continue
name = name_match.group(1)
ip_match = re.search(r"Dirección IP:\s+(\d+\.\d+\.\d+\.\d+)", config)
mask_match = re.search(
r"Máscara de subred:\s+(\d+\.\d+\.\d+\.\d+)", config
)
gateway_match = re.search(
r"Puerta de enlace predeterminada:\s+(\d+\.\d+\.\d+\.\d+)", config
)
dhcp_match = re.search(r"DHCP habilitado:\s+(\w+)", config)
interface = NetworkInterface(
name=name,
description=name,
ip_address=ip_match.group(1) if ip_match else "",
subnet_mask=mask_match.group(1) if mask_match else "",
gateway=gateway_match.group(1) if gateway_match else "",
dhcp_enabled=(
dhcp_match.group(1).lower() == "" if dhcp_match else True
),
active=True,
)
self.interfaces.append(interface)
except subprocess.CalledProcessError as e:
raise Exception(f"Error executing netsh: {str(e)}")
except Exception as e:
raise Exception(f"Error parsing netsh output: {str(e)}")
def on_interface_selected(self, event=None):
selected = self.if_var.get()
interface = next((inf for inf in self.interfaces if inf.name == selected), None)
if interface:
self.current_ip_var.set(interface.ip_address)
self.current_mask_var.set(interface.subnet_mask)
self.current_gateway_var.set(interface.gateway)
self.dhcp_status_var.set(
"Enabled" if interface.dhcp_enabled else "Disabled"
)
# Actualizar IP prefix si hay una IP válida
if interface.ip_address:
prefix = ".".join(interface.ip_address.split(".")[:3])
self.ip_prefix.set(prefix)
# Update ping target for the new IP prefix
self.update_ping_target(prefix)
# Guardar interfaz seleccionada
self.config.last_interface = selected
self.config.save_config()
self.log_message(f"Selected interface: {selected}")
# Update scan IP range based on current IP and subnet mask
if interface.ip_address and interface.subnet_mask:
self.update_scan_ip_range(interface.ip_address, interface.subnet_mask)
def validate_ip_input(self) -> tuple[bool, str]:
try:
prefix = self.ip_prefix.get().strip()
last_octet = self.last_octet.get().strip()
subnet_mask = self.subnet_mask.get().strip()
# Validar formato del prefijo
if not re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}", prefix):
return False, "IP prefix must be in format: xxx.xxx.xxx"
# Validar último octeto
if not last_octet.isdigit() or not 0 <= int(last_octet) <= 255:
return False, "Last octet must be between 0 and 255"
# Validar máscara de subred
if not self.is_valid_subnet_mask(subnet_mask):
return False, "Invalid subnet mask format"
# Construir y validar IP completa
ip = f"{prefix}.{last_octet}"
ipaddress.IPv4Address(ip)
return True, ip
except ValueError as e:
return False, f"Invalid IP address: {str(e)}"
def execute_admin_script(
self,
interface: str,
mode: str,
debug: bool = True,
subnet_mask: str = "255.255.255.0",
) -> bool:
"""
Ejecuta el script administrativo con los argumentos proporcionados
mode puede ser una IP o 'dhcp'
"""
# Obtener rutas absolutas
current_dir = os.path.dirname(os.path.abspath(__file__))
script_path = os.path.join(current_dir, "ip-changer-admin.py")
python_exe = sys.executable
# Verificar que el script existe
if not os.path.exists(script_path):
error_msg = f"Admin script not found at: {script_path}"
self.log_message(error_msg)
return False
# Construir la línea de argumentos - pass both IP and subnet as a single parameter
if mode != "dhcp":
# For static IP, create the netsh command with subnet mask
gateway = (
f"{self.get_ip_prefix(mode)}.1" # Use the first three octets of the IP
)
netsh_cmd = f'netsh interface ip set address "{interface}" static {mode} {subnet_mask} {gateway}'
args = f'"{script_path}" "{interface}" "{netsh_cmd}"'
else:
# For DHCP, keep it simple
args = f'"{script_path}" "{interface}" "dhcp"'
if debug:
self.log_message("Debug Information:")
self.log_message(f"Current Directory: {current_dir}")
self.log_message(f"Script Path: {script_path}")
self.log_message(f"Python Executable: {python_exe}")
self.log_message(f"Full Command: {python_exe} {args}")
try:
self.log_message(
f"Attempting to execute admin script with elevated privileges"
)
ret = ctypes.windll.shell32.ShellExecuteW(
None, # hwnd
"runas", # operation
python_exe, # file
args, # parameters
current_dir, # directory
1, # show command
)
result_code = int(ret)
if result_code > 32:
self.log_message(
f"ShellExecuteW successful (return code: {result_code})"
)
return True
else:
error_codes = {
0: "The operating system is out of memory or resources",
2: "The specified file was not found",
3: "The specified path was not found",
5: "Access denied",
8: "Insufficient memory to complete the operation",
11: "Bad format",
26: "A sharing violation occurred",
27: "The file name association is incomplete or invalid",
28: "The DDE operation timed out",
29: "The DDE operation failed",
30: "The DDE operation is busy",
31: "The file name association is unavailable",
32: "No application is associated with the file",
}
self.log_message(
f"Failed to execute admin script. Error code {result_code}: {error_msg}"
)
return False
except Exception as e:
self.log_message(f"Exception while executing admin script: {str(e)}")
return False
def save_previous_config(self, interface_name: str):
"""Guarda la configuración actual de la interfaz antes de cambiarla"""
interface = next(
(inf for inf in self.interfaces if inf.name == interface_name), None
)
if interface:
self.config.previous_config = {
"name": interface.name,
"ip_address": interface.ip_address,
"subnet_mask": interface.subnet_mask,
"gateway": interface.gateway,
"dhcp_enabled": interface.dhcp_enabled,
}
self.config.save_config()
self.log_message(f"Previous configuration saved for {interface.name}")
def restore_previous(self):
"""Restaura la configuración IP anterior"""
prev_config = self.config.previous_config
if not prev_config:
self.show_error("No previous configuration available")
return
interface_name = prev_config.get("name")
if not interface_name:
self.show_error("Invalid previous configuration")
return
# Verificar que la interfaz existe
if interface_name not in [inf.name for inf in self.interfaces]:
self.show_error(f"Interface {interface_name} not found")
return
self.log_message(f"Restoring previous configuration for {interface_name}")
if prev_config.get("dhcp_enabled", True):
# Si la configuración anterior era DHCP
if self.execute_admin_script(interface_name, "dhcp"):
self.log_message(f"Successfully restored DHCP on {interface_name}")
time.sleep(2)
self.refresh_interfaces()
else:
self.show_error("Failed to restore DHCP configuration")
else:
# Si la configuración anterior era IP estática
ip = prev_config.get("ip_address")
if not ip:
self.show_error("Previous IP address not available")
return
if self.execute_admin_script(interface_name, ip):
self.log_message(f"Successfully restored IP {ip} on {interface_name}")
time.sleep(2)
self.refresh_interfaces()
else:
self.show_error("Failed to restore static IP configuration")
def do_ping(self):
"""Realiza un ping a la dirección especificada usando una biblioteca de Python"""
target = self.ping_target.get().strip()
if not target:
self.show_error("Please enter a target IP or hostname")
return
# Save the ping target when used
self.save_current_ping_target()
# Check if continuous ping is enabled
if self.continuous_ping.get():
self.log_message(f"Starting continuous ping to {target}...")
# Reset the stop event
self.ping_stop_event.clear()
self.ping_running = True
# Start ping in a separate thread
threading.Thread(
target=self._execute_continuous_ping, args=(target,), daemon=True
).start()
else:
self.log_message(f"Pinging {target}...")
# Standard ping (original behavior)
threading.Thread(
target=self._execute_ping, args=(target,), daemon=True
).start()
def stop_ping(self):
"""Stops any ongoing continuous ping"""
if self.ping_running:
self.log_message("Stopping continuous ping...")
self.ping_stop_event.set()
self.ping_running = False
self.last_ping_time.set("N/A") # Reset when stopped
def _execute_continuous_ping(self, target: str):
"""Execute continuous ping until stopped"""
try:
self.log_message(
f"Continuous ping to {target} started. Press Stop to terminate."
)
count = 0
while not self.ping_stop_event.is_set():
count += 1
if PING_LIBRARY_AVAILABLE:
try:
# Send a single ping
response = pythonping(target, count=1, timeout=1)
# Check if any reply was successful
if any(reply.success for reply in response):
# Get the first successful reply
for reply in response:
if reply.success:
rtt = reply.time_elapsed_ms
# Update the last ping time field
self.last_ping_time.set(f"{rtt:.1f} ms")
self.log_message(
f"Reply from {target}: time={rtt:.2f}ms (seq={count})"
)
break
else:
self.last_ping_time.set("Timeout")
self.log_message(f"Request timed out (seq={count})")
except Exception as ping_error:
self.last_ping_time.set("Error")
self.log_message(f"Ping error: {str(ping_error)} (seq={count})")
else:
# Fallback to socket
try:
start_time = time.time()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
result = s.connect_ex((target, 80))
elapsed_time = (time.time() - start_time) * 1000 # ms
s.close()
if result == 0:
# Update the last ping time field
self.last_ping_time.set(f"{elapsed_time:.1f} ms")
self.log_message(
f"Connected to {target}:80 in {elapsed_time:.2f}ms (seq={count})"
)
else:
self.last_ping_time.set("Failed")
self.log_message(
f"Failed to connect to {target}:80 (seq={count})"
)
except Exception as socket_error:
self.last_ping_time.set("Error")
self.log_message(
f"Connection error: {str(socket_error)} (seq={count})"
)
# Wait 1 second before next ping
for _ in range(
10
): # Check for stop every 100ms for more responsive stopping
if self.ping_stop_event.is_set():
break
time.sleep(0.1)
self.log_message(
f"Continuous ping to {target} stopped after {count} pings."
)
except Exception as e:
self.log_message(f"Error in continuous ping: {str(e)}")
finally:
self.ping_running = False
self.last_ping_time.set("N/A") # Reset on stop
def _execute_ping(self, target: str):
"""Execute a single ping and display the results"""
try:
# Reset the last ping time to "Testing..."
self.last_ping_time.set("Testing...")
self.log_message(f"Pinging {target} with 4 echo requests...", "PING")
if PING_LIBRARY_AVAILABLE:
# Use PythonPing library
response = pythonping(target, count=4, timeout=1)
# Display individual replies with PING tag
total_rtt = 0
successful_pings = 0
for i, reply in enumerate(response):
if reply.success:
rtt = reply.time_elapsed_ms
total_rtt += rtt
successful_pings += 1
self.log_message(
f"Reply from {target}: time={rtt:.2f}ms", "PING"
)
# Update the last ping time with the most recent successful ping
self.last_ping_time.set(f"{rtt:.1f} ms")
else:
self.log_message(f"Request timed out (seq={i+1})", "PING")
# If all pings failed, update the last ping time appropriately
if successful_pings == 0:
self.last_ping_time.set("Timeout")
# Display statistics
success_count = sum(1 for r in response if r.success)
loss_percentage = (4 - success_count) * 25
self.log_message(f"\nPing statistics for {target}:")
self.log_message(
f" Packets: Sent = 4, Received = {success_count}, Lost = {4 - success_count} ({loss_percentage}% loss)"
)
if success_count > 0:
min_rtt = min(r.time_elapsed_ms for r in response if r.success)
max_rtt = max(r.time_elapsed_ms for r in response if r.success)
avg_rtt = (
sum(r.time_elapsed_ms for r in response if r.success)
/ success_count
)
self.log_message(f"Approximate round trip times in milliseconds:")
self.log_message(
f" Minimum = {min_rtt:.2f}ms, Maximum = {max_rtt:.2f}ms, Average = {avg_rtt:.2f}ms"
)
else:
# Fallback to socket if PythonPing is not available
self.log_message("Using socket connection test (not a true ICMP ping)")
success_count = 0
times = []
for i in range(4):
try:
start_time = time.time()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
result = s.connect_ex((target, 80)) # Try to connect to port 80
elapsed_time = (
time.time() - start_time
) * 1000 # Convert to ms
s.close()
if result == 0:
success_count += 1
times.append(elapsed_time)
self.log_message(
f"Connected to {target}:80 in {elapsed_time:.2f}ms"
)
else:
self.log_message(
f"Failed to connect to {target}:80 (seq={i+1})"
)
except Exception as e:
self.log_message(f"Connection error: {str(e)} (seq={i+1})")
# Display statistics
loss_percentage = (4 - success_count) * 25
self.log_message(f"\nConnection statistics for {target}:")
self.log_message(
f" Attempts: Sent = 4, Successful = {success_count}, Failed = {4 - success_count} ({loss_percentage}% loss)"
)
if success_count > 0:
self.log_message(f"Approximate connection times in milliseconds:")
self.log_message(
f" Minimum = {min(times):.2f}ms, Maximum = {max(times):.2f}ms, Average = {sum(times)/len(times):.2f}ms"
)
# Try to get hostname and MAC address
hostname = self.get_hostname(target)
mac_address = self.get_mac_address(target)
vendor = ""
# Look up vendor if we have a MAC
if mac_address and MAC_LOOKUP_AVAILABLE:
vendor = self.get_mac_vendor(mac_address)
if hostname:
self.log_message(f"Hostname: {hostname}", "HOSTNAME")
if mac_address:
self.log_message(f"MAC Address: {mac_address}", "MAC")
if vendor:
self.log_message(f"MAC Vendor: {vendor}", "VENDOR")
except Exception as e:
self.last_ping_time.set("Error")
self.log_message(f"Error executing ping: {str(e)}", "ERROR")
def start_scan(self):
"""Start network scanning between IP range"""
start_ip = self.scan_start_ip.get().strip()
end_ip = self.scan_end_ip.get().strip()
# Validate IPs
try:
start = ipaddress.IPv4Address(start_ip)
end = ipaddress.IPv4Address(end_ip)
if start > end:
self.show_error("Start IP must be lower than or equal to End IP")
return
# Clear previous results
for item in self.scan_results_tree.get_children():
self.scan_results_tree.delete(item)
self.scan_results = []
# Calculate total IPs to scan for progress bar
ip_range = int(end) - int(start) + 1
self.scan_progress_var.set(0)
self.scan_progress["maximum"] = ip_range
# Update nodes to scan count
self.nodes_to_scan.set(str(ip_range))
self.log_message(
f"Starting scan from {start_ip} to {end_ip} ({ip_range} addresses)..."
)
# Reset stop event and set running flag
self.scan_stop_event.clear()
self.scan_running = True
# Start scan in a separate thread
threading.Thread(
target=self._execute_scan, args=(start, end, ip_range), daemon=True
).start()
except ValueError as e:
self.show_error(f"Invalid IP address: {str(e)}")
def stop_scan(self):
"""Stop an ongoing network scan"""
if self.scan_running:
self.log_message("Stopping network scan...")
self.scan_stop_event.set()
# Wait for all scan threads to finish
for thread in self.scan_threads:
if thread.is_alive():
thread.join(0.5)
self.scan_running = False
self.log_message("Network scan stopped.")
def _execute_scan(self, start_ip, end_ip, ip_range):
"""Execute the network scan - now only performs ping scan"""
try:
# Empty the queue
while not self.scan_queue.empty():
self.scan_queue.get_nowait()
# Clear threads list
self.scan_threads = []
# Reset counter for finished IPs
self.scan_completed = 0
# Clear previous scan results tree
for item in self.scan_results_tree.get_children():
self.scan_results_tree.delete(item)
# Fill queue with all IPs in range
for ip_int in range(int(start_ip), int(end_ip) + 1):
self.scan_queue.put(ipaddress.IPv4Address(ip_int))
# Start worker threads for parallel scanning
max_threads = min(
20, ip_range
) # Limit to 20 threads or fewer if range is small
for i in range(max_threads):
thread = threading.Thread(target=self._scan_worker, daemon=True)
thread.start()
self.scan_threads.append(thread)
# Wait for threads to complete or be stopped
while self.scan_running and any(t.is_alive() for t in self.scan_threads):
time.sleep(0.1)
if not self.scan_stop_event.is_set():
self.log_message(
f"Scan completed. Found {len(self.scan_results)} active hosts. Use 'Get Host Info' to lookup hostnames and MAC addresses."
)
except Exception as e:
self.log_message(f"Error in network scan: {str(e)}")
finally:
self.scan_running = False
def _scan_worker(self):
"""Worker thread function to scan IPs from the queue - now with ping time recording"""
while not self.scan_stop_event.is_set():
try:
# Get next IP from queue with timeout
try:
ip_address = self.scan_queue.get(timeout=0.1)
except queue.Empty:
# No more IPs to scan
break
# Ping the IP and measure the time
ping_time = 0
ip_str = str(ip_address)
is_active, ping_delay = self._ping_host_with_time(ip_str)
# If active, add to results - but don't get host info yet
if is_active:
self.log_message(
f"Host discovered: {ip_str} ({ping_delay:.1f}ms)", "DISCOVERY"
)
self.scan_results.append(ip_str)
# Add to results tree with placeholder values and ping time
self.master.after(
0,
lambda ip=ip_str, delay=ping_delay: self.add_scan_result(
ip, delay, "", "", ""
),
)
# Update progress
self.scan_completed = self.scan_completed + 1
self.scan_progress_var.set(self.scan_completed)
# Mark task as done
self.scan_queue.task_done()
except Exception as e:
self.log_message(f"Error in scan worker: {str(e)}")
def gather_host_information(self):
"""Gather hostname and MAC address information for discovered hosts"""
if not self.scan_results:
self.show_info("No hosts discovered yet. Run a scan first.")
return
# Start the information gathering in a separate thread
threading.Thread(target=self._execute_host_gathering, daemon=True).start()
def _execute_host_gathering(self):
"""Perform the actual host information gathering"""
try:
self.log_message(
f"Gathering information for {len(self.scan_results)} hosts...", "INFO"
)
# Reset progress bar for this operation
total_hosts = len(self.scan_results)
self.scan_progress_var.set(0)
self.scan_progress["maximum"] = total_hosts
# Get information for each host
for i, ip in enumerate(self.scan_results):
if self.scan_stop_event.is_set():
self.log_message("Host information gathering stopped.", "WARN")
break
self.log_message(
f"Getting information for host {ip} ({i+1}/{total_hosts})..."
)
# Get hostname and MAC
hostname = self.get_hostname(ip)
mac_address = self.get_mac_address(ip)
vendor = ""
# Look up vendor information if we have a MAC address
if mac_address and MAC_LOOKUP_AVAILABLE:
vendor = self.get_mac_vendor(mac_address)
# Log the results in a cleaner format
result_parts = []
if hostname:
result_parts.append(f"Hostname: {hostname}")
if mac_address:
result_parts.append(f"MAC: {mac_address}")
if vendor:
result_parts.append(f"Vendor: {vendor}")
if result_parts:
self.log_message(f" {ip}" + " | ".join(result_parts))
else:
self.log_message(f" {ip} → No additional information found")
# Update the tree with vendor information
self.update_host_in_tree(ip, hostname, mac_address, vendor)
# Update progress
self.scan_progress_var.set(i + 1)
self.log_message("Host information gathering completed.", "SUCCESS")
except Exception as e:
self.log_message(f"Error gathering host information: {str(e)}", "ERROR")
def update_host_in_tree(self, ip, hostname, mac, vendor=""):
"""Update an existing host in the treeview with obtained information"""
try:
# Find the item with this IP
for item_id in self.scan_results_tree.get_children():
values = self.scan_results_tree.item(item_id, "values")
if values and values[0] == ip:
# Get the current delay value (to preserve it)
delay = values[1] if len(values) > 1 else ""
# Update the values, preserving the delay value
hostname_str = hostname if hostname else "Not resolved"
mac_str = (
mac.upper() if mac else "Not found"
) # Ensure uppercase MAC
vendor_str = vendor if vendor else "Unknown"
self.scan_results_tree.item(
item_id, values=(ip, delay, hostname_str, mac_str, vendor_str)
)
break
except Exception as e:
self.log_message(f"Error updating host in treeview: {str(e)}")
def add_scan_result(self, ip, delay=0, hostname="", mac="", vendor=""):
"""Helper method to add scan result to the treeview"""
try:
# Insert the item with all values
hostname = hostname if hostname else "Not resolved"
mac = mac if mac else "Not found"
vendor = vendor if vendor else "Unknown"
delay_str = f"{delay:.1f}" if delay > 0 else ""
self.scan_results_tree.insert(
"", "end", values=(ip, delay_str, hostname, mac, vendor)
)
except Exception as e:
self.log_message(f"Error adding scan result to UI: {str(e)}")
def _ping_host_with_time(self, host: str) -> tuple[bool, float]:
"""Ping a host to check if it's active, returns (success, delay_in_ms)"""
try:
if PING_LIBRARY_AVAILABLE:
# Use PythonPing with shorter timeout for scanning
start_time = time.time()
response = pythonping(host, count=1, timeout=0.5)
ping_time = 0
if response.success():
# Get the actual ping time from the response
for reply in response:
if reply.success:
ping_time = reply.time_elapsed_ms
break
return True, ping_time
return False, 0
else:
# Fallback using socket with timing
start_time = time.time()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.5)
result = s.connect_ex((host, 80)) # Try common port
ping_time = (time.time() - start_time) * 1000 # ms
s.close()
return result == 0, ping_time if result == 0 else 0
except:
return False, 0
def set_static_ip(self):
interface = self.if_var.get()
if not interface:
self.show_error("Please select a network interface")
return
# Guardar configuración actual antes de cambiarla
self.save_previous_config(interface)
# Validar IP
is_valid, result = self.validate_ip_input()
if not is_valid:
self.show_error(result)
return
ip = result
subnet_mask = self.subnet_mask.get()
# Guardar IP actual en el historial
self.add_to_history(self.ip_prefix.get())
# Log the actual command we'll be executing
gateway = f"{self.ip_prefix.get()}.1"
command = f'netsh interface ip set address "{interface}" static {ip} {subnet_mask} {gateway}'
self.log_message(f"Executing network command: {command}")
# Ejecutar script con privilegios - pass subnet mask
if self.execute_admin_script(
interface, ip, debug=True, subnet_mask=subnet_mask
):
time.sleep(2) # Esperar a que se apliquen los cambios
self.refresh_interfaces()
self.log_message(
f"Successfully set static IP {ip} with mask {subnet_mask} on {interface}"
)
else:
self.show_error("Failed to set static IP. Check the log for details.")
def set_dhcp(self):
interface = self.if_var.get()
if not interface:
self.show_error("Please select a network interface")
return
# Guardar configuración actual antes de cambiarla
self.save_previous_config(interface)
# Ejecutar script con privilegios
if self.execute_admin_script(interface, "dhcp", debug=True):
time.sleep(2) # Esperar a que se apliquen los cambios
self.refresh_interfaces()
self.log_message(f"Successfully enabled DHCP on {interface}")
else:
self.show_error("Failed to enable DHCP. Check the log for details.")
def run_command(self, cmd: str) -> bool:
self.log_message(f"Executing command: {cmd}")
try:
process = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
while True:
output = process.stdout.readline()
if output == "" and process.poll() is not None:
break
if output:
self.log_message(output.strip())
retcode = process.poll()
error_output = process.stderr.read()
if error_output:
self.log_message(f"Error output:\n{error_output}")
if retcode == 0:
self.log_message("Command executed successfully")
return True
else:
self.log_message(f"Command failed with return code: {retcode}")
return False
except Exception as e:
self.log_message(f"Error executing command: {str(e)}")
return False
def is_admin(self):
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except Exception as e:
self.log_message(f"Error checking admin privileges: {str(e)}")
return False
def elevate_privileges(self):
if sys.platform == "win32":
self.show_info(
"The application needs administrator privileges to change network settings. "
"Please confirm the UAC prompt."
)
script_path = os.path.abspath(sys.argv[0])
try:
ctypes.windll.shell32.ShellExecuteW(
None, "runas", sys.executable, f'"{script_path}"', None, 1
)
self.master.quit()
except Exception as e:
self.log_message(f"Error elevating privileges: {str(e)}")
self.show_error("Failed to elevate privileges")
def show_error(self, message: str):
self.log_message(f"ERROR: {message}", "ERROR")
messagebox.showerror("Error", message)
def show_info(self, message: str):
self.log_message(f"INFO: {message}", "INFO")
messagebox.showinfo("Information", message)
# Add new methods for subnet mask conversion
def on_subnet_mask_changed(self, *args):
"""Update CIDR bits when subnet mask changes and update scan IP range"""
# Use a static variable to prevent recursion
if hasattr(self, "_updating_mask") and self._updating_mask:
return
try:
self._updating_mask = True
mask = self.subnet_mask.get()
if self.is_valid_subnet_mask(mask):
# Convert mask to bits
bits = self.subnet_mask_to_cidr(mask)
self.cidr_bits.set(str(bits))
# Update scan IP range if we have a current IP
current_ip = self.current_ip_var.get()
if current_ip:
self.update_scan_ip_range(current_ip, mask)
except Exception as e:
self.log_message(f"Error updating CIDR bits: {str(e)}")
finally:
self._updating_mask = False
def on_cidr_bits_changed(self, *args):
"""Update subnet mask when CIDR bits change and update scan IP range"""
# Use a static variable to prevent recursion
if hasattr(self, "_updating_bits") and self._updating_bits:
return
try:
self._updating_bits = True
bits_str = self.cidr_bits.get()
if bits_str.isdigit():
bits = int(bits_str)
if 0 <= bits <= 32:
# Convert bits to mask
mask = self.cidr_to_subnet_mask(bits)
self.subnet_mask.set(mask)
# Update scan IP range if we have a current IP
current_ip = self.current_ip_var.get()
if current_ip:
self.update_scan_ip_range(current_ip, mask)
except Exception as e:
self.log_message(f"Error updating subnet mask: {str(e)}")
finally:
self._updating_bits = False
def is_valid_subnet_mask(self, mask: str) -> bool:
"""Validate if the string is a valid subnet mask"""
try:
# Check if it has the format x.x.x.x
parts = mask.split(".")
if len(parts) != 4:
return False
# Each part should be a number between 0-255
for part in parts:
if not part.isdigit() or not 0 <= int(part) <= 255:
return False
# Check if it's a valid subnet mask pattern
# Convert to binary and ensure it's a continuous sequence of 1s followed by 0s
binary = "".join([bin(int(octet))[2:].zfill(8) for octet in parts])
if "01" in binary: # If there's a 0 followed by 1, it's not valid
return False
return True
except Exception:
return False
def subnet_mask_to_cidr(self, mask: str) -> int:
"""Convert subnet mask to CIDR notation bits"""
try:
# Convert each octet to binary and count the number of 1s
parts = mask.split(".")
binary = "".join([bin(int(octet))[2:].zfill(8) for octet in parts])
return binary.count("1")
except Exception as e:
self.log_message(f"Error converting subnet mask to CIDR: {str(e)}")
return 24 # Default to /24
def cidr_to_subnet_mask(self, bits: int) -> str:
"""Convert CIDR bits to subnet mask in dotted decimal notation"""
try:
# Create a binary string with the specified number of 1s followed by 0s
binary = "1" * bits + "0" * (32 - bits)
# Split the binary string into 4 octets and convert each to decimal
octets = [binary[i : i + 8] for i in range(0, 32, 8)]
decimals = [str(int(octet, 2)) for octet in octets]
return ".".join(decimals)
except Exception as e:
self.log_message(f"Error converting CIDR to subnet mask: {str(e)}")
return "255.255.255.0" # Default to 255.255.255.0
def update_ping_target(self, ip_prefix):
"""Update the ping target field based on the selected IP prefix"""
if ip_prefix:
target = self.config.get_ping_target(ip_prefix)
self.ping_target.set(target)
def save_current_ping_target(self, event=None):
"""Save the current ping target for the current IP prefix"""
ip_prefix = self.ip_prefix.get()
target = self.ping_target.get()
if ip_prefix and target:
self.config.set_ping_target(ip_prefix, target)
self.log_message(f"Saved ping target {target} for IP prefix {ip_prefix}")
def copy_to_clipboard(self, value):
"""Copy the given value to clipboard"""
self.master.clipboard_clear()
self.master.clipboard_append(value)
self.log_message(f"Copied to clipboard: {value}")
def copy_current_ip(self):
"""Copy the current IP to clipboard"""
ip = self.current_ip_var.get()
if ip:
self.copy_to_clipboard(ip)
else:
self.show_info("No IP address available to copy")
def copy_current_gateway(self):
"""Copy the current gateway to clipboard"""
gateway = self.current_gateway_var.get()
if gateway:
self.copy_to_clipboard(gateway)
else:
self.show_info("No gateway address available to copy")
def update_scan_ip_range(self, ip_address, subnet_mask):
"""Calculate and update scan IP range based on current IP and subnet mask"""
try:
# Calculate network address and broadcast address
ip_obj = ipaddress.IPv4Address(ip_address)
mask_obj = ipaddress.IPv4Address(subnet_mask)
# Convert mask to binary string
mask_bits = bin(int(mask_obj))[2:].zfill(32)
network_bits = mask_bits.count("1")
# Update the scan CIDR field
self.scan_cidr_bits.set(str(network_bits))
# Create network object
network = ipaddress.IPv4Network(
f"{ip_address}/{network_bits}", strict=False
)
# Get first and last usable host addresses
if network.num_addresses <= 2:
# For point-to-point links or /31 masks
start_ip = network.network_address
end_ip = network.broadcast_address
else:
# Skip network address and broadcast address
start_ip = network.network_address + 1
end_ip = network.broadcast_address - 1
# Update the scan fields
self.scan_start_ip.set(str(start_ip))
self.scan_end_ip.set(str(end_ip))
# Update the nodes to scan count
nodes_count = int(end_ip) - int(start_ip) + 1
self.nodes_to_scan.set(str(nodes_count))
self.log_message(
f"Updated scan range: {start_ip} - {end_ip} ({nodes_count} nodes)"
)
except Exception as e:
self.log_message(f"Error calculating IP range: {str(e)}")
def update_scan_range_from_cidr(self, *args):
"""Update scan IP range when CIDR value changes - only modify End IP"""
try:
start_ip = self.scan_start_ip.get().strip()
cidr_bits = self.scan_cidr_bits.get().strip()
if not start_ip or not cidr_bits.isdigit():
return
# Convert CIDR to network
bits = int(cidr_bits)
if 0 <= bits <= 32:
# Use the start IP as the base for the network
network = ipaddress.IPv4Network(f"{start_ip}/{bits}", strict=False)
# Keep the original start IP and just update the end IP
# For the end IP, use the broadcast address (or last usable address)
if network.num_addresses <= 2:
end_ip = network.broadcast_address
else:
end_ip = network.broadcast_address - 1
# Update only the end IP field
self.scan_end_ip.set(str(end_ip))
# Update the nodes to scan count
start = ipaddress.IPv4Address(start_ip)
nodes_count = int(end_ip) - int(start) + 1
self.nodes_to_scan.set(str(nodes_count))
self.log_message(
f"Updated scan range from CIDR: {start_ip} - {end_ip} ({nodes_count} nodes)"
)
except Exception as e:
self.log_message(f"Error updating scan range from CIDR: {str(e)}")
def get_hostname(self, ip_address):
"""Try to resolve hostname for an IP address with better error handling"""
try:
# Silently try to resolve without logging the attempt
hostname, _, _ = socket.gethostbyaddr(ip_address)
return hostname
except socket.herror:
# Don't log the error details - just return empty
return ""
except socket.gaierror:
return ""
except socket.timeout:
return ""
except Exception:
return ""
def get_mac_address(self, ip_address):
"""Get MAC address for an IP using ARP on Windows with improved parsing"""
try:
# Run the ARP command with timeout to avoid hanging
result = subprocess.run(
f"arp -a {ip_address}",
shell=True,
capture_output=True,
text=True,
timeout=2,
)
if result.returncode == 0 and result.stdout:
# Don't log the raw ARP output anymore
# Parse the output to find the MAC address
# First try the typical format with hyphens or colons
mac_match = re.search(
r"([0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2})",
result.stdout,
re.IGNORECASE,
)
if mac_match:
# Convert to uppercase for consistent display
return mac_match.group(0).upper()
# Try an alternative format with spaces (Windows format)
mac_match = re.search(
r"([0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2})",
result.stdout,
re.IGNORECASE,
)
if mac_match:
# Convert to uppercase for consistent display
return mac_match.group(0).upper()
# Try yet another approach - look for any string of hex digits with separators
lines = result.stdout.strip().split("\n")
for line in lines:
if ip_address in line:
parts = line.split()
# Usually the MAC address is the second column in the output
if len(parts) >= 2:
# Check if the second part looks like a MAC address
if re.match(
r"([0-9a-f]{2}[^\w]){5}[0-9a-f]{2}",
parts[1],
re.IGNORECASE,
):
# Convert to uppercase for consistent display
return parts[1].upper()
# Only return empty string if MAC not found - don't log the failure
return ""
except subprocess.TimeoutExpired:
return ""
except Exception:
return ""
def get_mac_vendor(self, mac_address):
"""Look up the vendor for a MAC address"""
try:
if not MAC_LOOKUP_AVAILABLE:
return ""
# Clean up the MAC address format - the library expects a standardized format
# Remove spaces, dashes, colons and ensure uppercase
clean_mac = (
mac_address.replace(" ", "").replace("-", "").replace(":", "").upper()
)
# Add colons to form a standard MAC format for lookup
formatted_mac = ":".join(
[clean_mac[i : i + 2] for i in range(0, len(clean_mac), 2)]
)
# Look up the vendor
vendor = self.mac_lookup.lookup(formatted_mac)
return vendor
except Exception as e:
self.log_message(f"Error looking up MAC vendor: {str(e)}")
return ""
def main():
root = tk.Tk()
app = IPChangerApp(root)
root.mainloop()
if __name__ == "__main__":
main()