⚝
One Hat Cyber Team
⚝
Your IP:
66.248.200.6
Server IP:
192.124.249.6
Server:
Linux 56.244.72.148.host.secureserver.net 5.14.0-570.62.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Nov 11 10:10:59 EST 2025 x86_64
Server Software:
Apache
PHP Version:
8.1.33
Buat File
|
Buat Folder
Eksekusi
Dir :
~
/
lib
/
fm-agent
/
plugins
/
View File Name :
fortisase_connection.py
import os import logging from ipaddress import IPv4Address, IPv6Address from configparser import ConfigParser, NoOptionError import sys import json import time import re import agent_util EPCTRL_LOG = "/Library/Application Support/Fortinet/FortiClient/Logs/epctrl.log" class FortisaseVPNConnection: def __init__(self): self.connection_state = None self.connection_name = None self.connected_address = None self.valid_tunnels = [ "Secure Internet Access", "Secure Internet Access - IPsec", "FortiSASE Cloud Security", "SASE Secure Internet Access", ] self.log = logging.getLogger("fortisase") def parse_connection(self, data): """ Receive a json data that contains the information of the VPN Connection, and its ip address """ connections = json.loads(data) for connection in connections: tunnel_name = connection.get("tunnel_name") connected = bool(connection.get("connected")) ip = connection.get("ip_address") if tunnel_name in self.valid_tunnels and connected: # Tunnel is valid self.connection_state = "Connected" self.connection_name = tunnel_name if ":" in ip: self.connected_address = IPv6Address(ip) else: self.connected_address = IPv4Address(ip) return True return False def is_sia_tunnel_up(self): """ Return True only if the tunnel name and connection state is in acceptable parameters. """ return ( self.connection_name in self.valid_tunnels and self.connection_state == "Connected" ) def get_public_ip_address(self): """ Parse the epctrl log file from FortiClient, backwards and return the first succesfull hit of the public_ip_checker found. Break after finding it. """ if not os.path.exists(EPCTRL_LOG): self.log.warning(f"{EPCTRL_LOG} file not found.") return start = time.time() ip_address = None with open(EPCTRL_LOG, "rb") as opened: file_size = opened.seek(0, os.SEEK_END) position = file_size while ip_address is None: if position == 0: break position = max(0, position - 5000) opened.seek(position) chunk = opened.read(min(5000, file_size - position)) ip_address = self._parse_log_chunk(chunk.decode()) end = time.time() self.log.info( f"Parsed {EPCTRL_LOG} in {end - start} seconds. Found: {ip_address}" ) return ip_address def _parse_log_chunk(self, chunk: str): for line in chunk.split("\n"): match = re.search( r"^.*\spublic_ip_checker:\d*\sGot public IP from ipify:\s(.*)$", line ) if match: try: return IPv4Address(match.groups()[0]) except Exception as err: self.log.warning( f"Unable to parse ipadress {match.groups()}. Err {err}" ) class FortiSaseConnection(agent_util.Plugin): textkey = "fortisase" label = "FortiSase" log = logging.getLogger("fortisase") # Default config file for Agent Config for FortiSase installations. # We use it to determine the installation type. If file is not present, is not valid config_file = "/usr/local/FortiMonitor/agent/config/fm-agent/fm_agent.cfg" @classmethod def get_metadata(cls, config): status = agent_util.SUPPORTED msg = None # Plugin only set to work on OSX for now. if sys.platform.lower() != "darwin": return {} # Agent needs to be a FortiSase installation to work as well. if not os.path.exists(cls.config_file): cls.log.info( "Agent config file not found. Unable to determine handshake type" ) return {} config_reader = ConfigParser() config_reader.read(cls.config_file) try: is_fortisase_install = ( config_reader.get("agent", "handshake_type").lower() == "forticlient" ) except NoOptionError: is_fortisase_install = False if not is_fortisase_install: status = agent_util.UNSUPPORTED msg = "Agent installation is not FortiSase" if status == agent_util.UNSUPPORTED: cls.log.info(f"Fortisase connection plugin disabled. {msg}") return {} metadata = { "osx.connected_sia": { "label": "Connected SIA", "options": None, "status": status, "error_msg": msg, "unit": "bool", }, "osx.turbo_ip": { "label": "Turbo IP", "options": None, "status": status, "error_msg": msg, }, "osx.public_ip": { "label": "Endpoint Public Ip", "options": None, "status": status, "error_msg": msg, }, } return metadata def check(self, textkey, data, config): try: # This file configuration is only available on FortiClient 7.4.4 and above. vpn_data = "/Library/Application Support/Fortinet/FortiClient/data/vpn_status_info.json" vpn_data_exists = os.path.exists(vpn_data) connected, metric = None, None client = FortisaseVPNConnection() if vpn_data_exists: with open(vpn_data, "r") as opened: data = opened.read() connected = client.parse_connection(data) else: self.log.warning( f"{vpn_data} file does not exist. Unable to fully collect data" ) if textkey == "osx.connected_sia": # Grab the fortitray connection value from the log file. connected = client.is_sia_tunnel_up() if connected: return 1 else: return 0 elif textkey == "osx.turbo_ip": connected = client.is_sia_tunnel_up() if not connected: # If the connection is not detected we don't need to check the turbo ip. return ip_address = client.connected_address if ip_address: metric = int(ip_address) elif textkey == "osx.public_ip": public_ip = client.get_public_ip_address() if public_ip: metric = int(public_ip) if metric: return float(metric) except Exception as msg: self.log.warning(f"Unable to process the FortiSase files. Error: {msg}") return