S7_snap7_Stremer_n_Recorder/main.py

461 lines
15 KiB
Python

from flask import Flask, render_template, request, jsonify, redirect, url_for
import snap7
import json
import socket
import time
import logging
import threading
from datetime import datetime
from typing import Dict, Any, Optional
import struct
import os
app = Flask(__name__)
app.secret_key = "plc_streamer_secret_key"
class PLCDataStreamer:
def __init__(self):
"""Initialize the PLC data streamer"""
# Configuration file paths
self.config_file = "plc_config.json"
self.variables_file = "plc_variables.json"
# Default configuration
self.plc_config = {"ip": "192.168.1.100", "rack": 0, "slot": 2}
self.udp_config = {"host": "127.0.0.1", "port": 9870}
# Configurable variables
self.variables = {}
# System states
self.plc = None
self.udp_socket = None
self.connected = False
self.streaming = False
self.stream_thread = None
self.sampling_interval = 0.1
# Setup logging first
self.setup_logging()
# Load configuration from files
self.load_configuration()
self.load_variables()
def setup_logging(self):
"""Configure the logging system"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("plc_data.log"), logging.StreamHandler()],
)
self.logger = logging.getLogger(__name__)
def load_configuration(self):
"""Load PLC and UDP configuration from JSON file"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, "r") as f:
config = json.load(f)
self.plc_config = config.get("plc_config", self.plc_config)
self.udp_config = config.get("udp_config", self.udp_config)
self.sampling_interval = config.get(
"sampling_interval", self.sampling_interval
)
self.logger.info(f"Configuration loaded from {self.config_file}")
else:
self.logger.info("No configuration file found, using defaults")
except Exception as e:
self.logger.error(f"Error loading configuration: {e}")
def save_configuration(self):
"""Save PLC and UDP configuration to JSON file"""
try:
config = {
"plc_config": self.plc_config,
"udp_config": self.udp_config,
"sampling_interval": self.sampling_interval,
}
with open(self.config_file, "w") as f:
json.dump(config, f, indent=4)
self.logger.info(f"Configuration saved to {self.config_file}")
except Exception as e:
self.logger.error(f"Error saving configuration: {e}")
def load_variables(self):
"""Load variables configuration from JSON file"""
try:
if os.path.exists(self.variables_file):
with open(self.variables_file, "r") as f:
self.variables = json.load(f)
self.logger.info(
f"Variables loaded from {self.variables_file}: {len(self.variables)} variables"
)
else:
self.logger.info(
"No variables file found, starting with empty variables"
)
except Exception as e:
self.logger.error(f"Error loading variables: {e}")
def save_variables(self):
"""Save variables configuration to JSON file"""
try:
with open(self.variables_file, "w") as f:
json.dump(self.variables, f, indent=4)
self.logger.info(f"Variables saved to {self.variables_file}")
except Exception as e:
self.logger.error(f"Error saving variables: {e}")
def update_plc_config(self, ip: str, rack: int, slot: int):
"""Update PLC configuration"""
self.plc_config = {"ip": ip, "rack": rack, "slot": slot}
self.save_configuration()
self.logger.info(f"PLC configuration updated: {self.plc_config}")
def update_udp_config(self, host: str, port: int):
"""Update UDP configuration"""
self.udp_config = {"host": host, "port": port}
self.save_configuration()
self.logger.info(f"UDP configuration updated: {self.udp_config}")
def update_sampling_interval(self, interval: float):
"""Update sampling interval"""
self.sampling_interval = interval
self.save_configuration()
self.logger.info(f"Sampling interval updated: {interval}s")
def add_variable(self, name: str, db: int, offset: int, var_type: str):
"""Add a variable for polling"""
self.variables[name] = {"db": db, "offset": offset, "type": var_type}
self.save_variables()
self.logger.info(f"Variable added: {name} -> DB{db}.{offset} ({var_type})")
def remove_variable(self, name: str):
"""Remove a variable from polling"""
if name in self.variables:
del self.variables[name]
self.save_variables()
self.logger.info(f"Variable removed: {name}")
def connect_plc(self) -> bool:
"""Connect to S7-315 PLC"""
try:
if self.plc:
self.plc.disconnect()
self.plc = snap7.client.Client()
self.plc.connect(
self.plc_config["ip"], self.plc_config["rack"], self.plc_config["slot"]
)
self.connected = True
self.logger.info(f"Connected to PLC {self.plc_config['ip']}")
return True
except Exception as e:
self.connected = False
self.logger.error(f"Error connecting to PLC: {e}")
return False
def disconnect_plc(self):
"""Disconnect from PLC"""
try:
if self.plc:
self.plc.disconnect()
self.connected = False
self.logger.info("Disconnected from PLC")
except Exception as e:
self.logger.error(f"Error disconnecting from PLC: {e}")
def read_variable(self, var_config: Dict[str, Any]) -> Any:
"""Read a specific variable from the PLC"""
try:
db = var_config["db"]
offset = var_config["offset"]
var_type = var_config["type"]
if var_type == "real":
raw_data = self.plc.db_read(db, offset, 4)
value = struct.unpack(">f", raw_data)[0]
elif var_type == "int":
raw_data = self.plc.db_read(db, offset, 2)
value = struct.unpack(">h", raw_data)[0]
elif var_type == "bool":
raw_data = self.plc.db_read(db, offset, 1)
value = bool(raw_data[0] & 0x01)
elif var_type == "dint":
raw_data = self.plc.db_read(db, offset, 4)
value = struct.unpack(">l", raw_data)[0]
else:
return None
return value
except Exception as e:
self.logger.error(f"Error reading variable: {e}")
return None
def read_all_variables(self) -> Dict[str, Any]:
"""Read all configured variables"""
if not self.connected or not self.plc:
return {}
data = {}
for var_name, var_config in self.variables.items():
value = self.read_variable(var_config)
if value is not None:
data[var_name] = value
return data
def setup_udp_socket(self) -> bool:
"""Setup UDP socket"""
try:
if self.udp_socket:
self.udp_socket.close()
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.logger.info(
f"UDP socket configured for {self.udp_config['host']}:{self.udp_config['port']}"
)
return True
except Exception as e:
self.logger.error(f"Error configuring UDP socket: {e}")
return False
def send_to_plotjuggler(self, data: Dict[str, Any]):
"""Send data to PlotJuggler via UDP JSON"""
if not self.udp_socket:
return
try:
message = {"timestamp": time.time(), "data": data}
json_message = json.dumps(message)
self.udp_socket.sendto(
json_message.encode("utf-8"),
(self.udp_config["host"], self.udp_config["port"]),
)
except Exception as e:
self.logger.error(f"Error sending data to PlotJuggler: {e}")
def streaming_loop(self):
"""Main streaming loop"""
self.logger.info(
f"Starting streaming with interval of {self.sampling_interval}s"
)
while self.streaming:
try:
start_time = time.time()
# Read all variables
data = self.read_all_variables()
if data:
# Send to PlotJuggler
self.send_to_plotjuggler(data)
# Log data
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self.logger.info(f"[{timestamp}] {data}")
# Maintain sampling interval
elapsed = time.time() - start_time
sleep_time = max(0, self.sampling_interval - elapsed)
time.sleep(sleep_time)
except Exception as e:
self.logger.error(f"Error in streaming loop: {e}")
break
def start_streaming(self) -> bool:
"""Start data streaming"""
if not self.connected:
self.logger.error("PLC not connected")
return False
if not self.variables:
self.logger.error("No variables configured")
return False
if not self.setup_udp_socket():
return False
self.streaming = True
self.stream_thread = threading.Thread(target=self.streaming_loop)
self.stream_thread.daemon = True
self.stream_thread.start()
self.logger.info("Streaming started")
return True
def stop_streaming(self):
"""Stop streaming"""
self.streaming = False
if self.stream_thread:
self.stream_thread.join(timeout=2)
if self.udp_socket:
self.udp_socket.close()
self.udp_socket = None
self.logger.info("Streaming stopped")
def get_status(self) -> Dict[str, Any]:
"""Get current system status"""
return {
"plc_connected": self.connected,
"streaming": self.streaming,
"plc_config": self.plc_config,
"udp_config": self.udp_config,
"variables_count": len(self.variables),
"sampling_interval": self.sampling_interval,
}
# Global streamer instance
streamer = PLCDataStreamer()
@app.route("/")
def index():
"""Main page"""
return render_template(
"index.html", status=streamer.get_status(), variables=streamer.variables
)
@app.route("/api/plc/config", methods=["POST"])
def update_plc_config():
"""Update PLC configuration"""
try:
data = request.get_json()
ip = data.get("ip", "10.1.33.11")
rack = int(data.get("rack", 0))
slot = int(data.get("slot", 2))
streamer.update_plc_config(ip, rack, slot)
return jsonify({"success": True, "message": "PLC configuration updated"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/udp/config", methods=["POST"])
def update_udp_config():
"""Update UDP configuration"""
try:
data = request.get_json()
host = data.get("host", "127.0.0.1")
port = int(data.get("port", 9870))
streamer.update_udp_config(host, port)
return jsonify({"success": True, "message": "UDP configuration updated"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/plc/connect", methods=["POST"])
def connect_plc():
"""Connect to PLC"""
if streamer.connect_plc():
return jsonify({"success": True, "message": "Connected to PLC"})
else:
return jsonify({"success": False, "message": "Error connecting to PLC"}), 500
@app.route("/api/plc/disconnect", methods=["POST"])
def disconnect_plc():
"""Disconnect from PLC"""
streamer.stop_streaming()
streamer.disconnect_plc()
return jsonify({"success": True, "message": "Disconnected from PLC"})
@app.route("/api/variables", methods=["POST"])
def add_variable():
"""Add a new variable"""
try:
data = request.get_json()
name = data.get("name")
db = int(data.get("db"))
offset = int(data.get("offset"))
var_type = data.get("type")
if not name or var_type not in ["real", "int", "bool", "dint"]:
return jsonify({"success": False, "message": "Invalid data"}), 400
streamer.add_variable(name, db, offset, var_type)
return jsonify({"success": True, "message": f"Variable {name} added"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/variables/<name>", methods=["DELETE"])
def remove_variable(name):
"""Remove a variable"""
streamer.remove_variable(name)
return jsonify({"success": True, "message": f"Variable {name} removed"})
@app.route("/api/streaming/start", methods=["POST"])
def start_streaming():
"""Start streaming"""
if streamer.start_streaming():
return jsonify({"success": True, "message": "Streaming started"})
else:
return jsonify({"success": False, "message": "Error starting streaming"}), 500
@app.route("/api/streaming/stop", methods=["POST"])
def stop_streaming():
"""Stop streaming"""
streamer.stop_streaming()
return jsonify({"success": True, "message": "Streaming stopped"})
@app.route("/api/sampling", methods=["POST"])
def update_sampling():
"""Update sampling interval"""
try:
data = request.get_json()
interval = float(data.get("interval", 0.1))
if interval < 0.01:
interval = 0.01
streamer.update_sampling_interval(interval)
return jsonify({"success": True, "message": f"Interval updated to {interval}s"})
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 400
@app.route("/api/status")
def get_status():
"""Get current status"""
return jsonify(streamer.get_status())
if __name__ == "__main__":
# Create templates directory if it doesn't exist
os.makedirs("templates", exist_ok=True)
print("🚀 Starting Flask server for PLC S7-315 Streamer")
print("📊 Web interface available at: http://localhost:5050")
print("🔧 Configure your PLC and variables through the web interface")
try:
app.run(debug=True, host="0.0.0.0", port=5050)
except KeyboardInterrupt:
print("\n⏹️ Stopping server...")
streamer.stop_streaming()
streamer.disconnect_plc()