Backups Created:
/home/japatmex/public_html/wp-content/edit-wolf.php
Savvy
W
olf -
MANAGER
Edit File: plesk_notifications.py
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import logging from functools import lru_cache from pathlib import Path from defence360agent.contracts.plugins import MessageSink from defence360agent.contracts.hooks import HooksConfig from defence360agent.subsys import notifier logger = logging.getLogger(__name__) SCRIPT_PATH = ( "/opt/psa/admin/plib/modules/imunify360/scripts/send-notifications.php" ) HOOK_PATH = "/opt/imunify360/venv/share/imunify360/scripts/send-notifications" EVENTS = [ "CUSTOM_SCAN_MALWARE_FOUND", "USER_SCAN_MALWARE_FOUND", "REALTIME_MALWARE_FOUND", ] class PleskNotificationsHooks(MessageSink): async def create_sink(self, loop): """MessageSink method""" if self.is_supported(): if not self.is_applied(): await self.add_hooks() else: await self.remove_hooks() @lru_cache(maxsize=1) def is_supported(self) -> bool: return Path(SCRIPT_PATH).exists() and Path(HOOK_PATH).exists() def is_applied(self) -> bool: config = HooksConfig().get() return all( [ HOOK_PATH in rule["SCRIPT"]["scripts"] for event, rule in config.get("rules", {}).items() if event in EVENTS ] ) async def add_hooks(self): config = HooksConfig().get() data = { "rules": { event: rule for event, rule in config.get("rules", {}).items() if event in EVENTS } } updated = False for event, rule in data["rules"].items(): if HOOK_PATH not in rule["SCRIPT"]["scripts"]: rule["SCRIPT"]["enabled"] = True rule["SCRIPT"]["scripts"].append(HOOK_PATH) updated = True if updated: HooksConfig().update(data) try: await notifier.config_updated() except ConnectionRefusedError: logger.warning( "Notifier is not running, cannot send CONFIG_UPDATED event" ) else: logger.info("Hooks added and configuration updated") async def remove_hooks(self): config = HooksConfig().get() data = { "rules": { event: rule for event, rule in config.get("rules", {}).items() if event in EVENTS } } updated = False for event, rule in data["rules"].items(): if HOOK_PATH in rule["SCRIPT"]["scripts"]: rule["SCRIPT"]["scripts"].remove(HOOK_PATH) rule["SCRIPT"]["enabled"] = len(rule["SCRIPT"]["scripts"]) != 0 updated = True if updated: HooksConfig().update(data) try: await notifier.config_updated() except ConnectionRefusedError: logger.warning( "Notifier is not running, cannot send CONFIG_UPDATED event" ) else: logger.info("Hooks removed and configuration updated")