ACIL FM
Dark
Refresh
Current DIR:
/opt/cloudlinux/venv/lib/python3.11/site-packages/clwpos
/
opt
cloudlinux
venv
lib
python3.11
site-packages
clwpos
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
bin
-
chmod
Open
Rename
Delete
cli_versions
-
chmod
Open
Rename
Delete
feature_suites
-
chmod
Open
Rename
Delete
hooks
-
chmod
Open
Rename
Delete
migrations
-
chmod
Open
Rename
Delete
object_cache
-
chmod
Open
Rename
Delete
optimization_features
-
chmod
Open
Rename
Delete
php
-
chmod
Open
Rename
Delete
user
-
chmod
Open
Rename
Delete
__pycache__
-
chmod
Open
Rename
Delete
billing.py
6.24 MB
chmod
View
DL
Edit
Rename
Delete
cl_wpos_exceptions.py
3.59 MB
chmod
View
DL
Edit
Rename
Delete
constants.py
5.56 MB
chmod
View
DL
Edit
Rename
Delete
create_user_uid_dirs.py
754 B
chmod
View
DL
Edit
Rename
Delete
cron.py
2.14 MB
chmod
View
DL
Edit
Rename
Delete
daemon.py
37.12 MB
chmod
View
DL
Edit
Rename
Delete
daemon_base.py
2.84 MB
chmod
View
DL
Edit
Rename
Delete
daemon_config.py
621 B
chmod
View
DL
Edit
Rename
Delete
daemon_redis_lib.py
11.93 MB
chmod
View
DL
Edit
Rename
Delete
daemon_subscription_handler.py
6.44 MB
chmod
View
DL
Edit
Rename
Delete
data_collector_utils.py
9.42 MB
chmod
View
DL
Edit
Rename
Delete
logsetup.py
4.04 MB
chmod
View
DL
Edit
Rename
Delete
papi.py
9.87 MB
chmod
View
DL
Edit
Rename
Delete
parse.py
2.1 MB
chmod
View
DL
Edit
Rename
Delete
redis_configuration_pid_file_cleaner.py
1.01 MB
chmod
View
DL
Edit
Rename
Delete
report_generator.py
21.18 MB
chmod
View
DL
Edit
Rename
Delete
scoped_cache.py
1.34 MB
chmod
View
DL
Edit
Rename
Delete
socket_utils.py
4.03 MB
chmod
View
DL
Edit
Rename
Delete
stats.py
12.02 MB
chmod
View
DL
Edit
Rename
Delete
utils.py
58.25 MB
chmod
View
DL
Edit
Rename
Delete
whmcs_utils.py
9.36 MB
chmod
View
DL
Edit
Rename
Delete
wpos_admin.py
67.14 MB
chmod
View
DL
Edit
Rename
Delete
wpos_hooks.py
4.85 MB
chmod
View
DL
Edit
Rename
Delete
wpos_req_scanner.py
4.38 MB
chmod
View
DL
Edit
Rename
Delete
wp_config.py
725 B
chmod
View
DL
Edit
Rename
Delete
wp_utils.py
16.33 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
928 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/cloudlinux/venv/lib/python3.11/site-packages/clwpos/wp_utils.py
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import absolute_import import json import os import pwd import re import subprocess from dataclasses import dataclass from typing import Optional, Union, List from functools import lru_cache from typing_extensions import TypedDict from clwpos import gettext as _ from clwpos.cl_wpos_exceptions import WposError, WpCliCommandError from clwpos.constants import RedisRequiredConstants, WP_CLI, WP_CLI_SKIP_PLUGINS_ENV from clwpos.data_collector_utils import php_info from clwpos.logsetup import ( setup_logging, ADMIN_LOGFILE_PATH, USER_LOGFILE_PATH ) from clwpos.php.base import PHP from clwpos.scoped_cache import cached_in_scope from clwpos.user.cache import wp_config_cache from clwpos.user.website_check import RollbackException, WebsiteCheckError from clwpos.user.website_check.errors import ( WebsiteCheckBadHttpCode, CDNActivationFailed, JSCssCheckBadHttpCode ) from clwpos.utils import ( WposUser, is_run_under_user, wp_cli_compatibility_check, run_in_cagefs_if_needed, user_name, litespeed_is_running, nginx_is_running, apache2nginx_is_running, is_wp2_environment_safe ) _logger = setup_logging(__name__) @dataclass class WordpressError: message: str context: dict code: Optional[int] = None def get_php_version(abs_wp_path: str) -> PHP: """ Return php_version that will be used for calling wp-cli commands. If 'CLWPOS_USE_SAVED_PHP_VERSION' envar is defined, try to get this version from a previously saved file. """ use_saved_php_version = bool(os.environ.get("CLWPOS_USE_SAVED_PHP_VERSION")) if use_saved_php_version: php_version = _get_saved_php_version(abs_wp_path) or _get_php_version(abs_wp_path) else: php_version = _get_php_version(abs_wp_path) return php_version @lru_cache(maxsize=None) def _get_php_version(abs_wp_path: str) -> PHP: """Return PHP version.""" result = php_info() items = [] for item in result: if abs_wp_path.startswith(os.path.normpath(item["documentroot"])): items.append((item["documentroot"], item["version"])) items.sort(reverse=True) return items[0][1] def _get_saved_php_version(abs_wp_path: str) -> Optional[PHP]: """ Get domain's php version from a previously saved file. """ if not is_run_under_user(): raise WposError('Internal Error. Contact CloudLinux support') php_file_id = os.environ.get("CLWPOS_PHP_FILE_ID") php_info_file = WposUser(user_name()).php_info.format(file_id=php_file_id) if not os.path.exists(php_info_file): return None try: with open(php_info_file) as f: _php_info = json.load(f) except (OSError, json.decoder.JSONDecodeError) as e: _logger.exception("Error during reading of \".php_info\" file: %s", e) return None php_versions = [] for vhost_info in _php_info: if abs_wp_path.startswith(vhost_info["documentroot"]): php_versions.append((vhost_info["documentroot"], vhost_info["version"])) if not php_versions: return None return PHP(**sorted( php_versions, key=lambda item: item[1]['identifier'], reverse=True )[0][1]) def wordpress(path: str, command: str, subcommand: str, *args, env=None) -> Union[str, WordpressError]: """ Helper to execute wp commands, for example wp --path=<path> plugin install redis-cache wp --path=<path> plugin activate redis-cache wp --path=<path> redis enable wp --path=<path> plugin deactivate redis-cache wp --path=<path> plugin uninstall redis-cache @return: stderr if error was happened. """ php_version = get_php_version(path) php_bin_path = str(php_version.bin) if not os.path.exists(php_bin_path): _logger.exception("Error during wp-cli command execution \"%s\": " "invalid path to binary file \"%s\"", command, php_bin_path) return WordpressError( message=_("Error during resolving path to php binary file:\n" "got non-existent path \"%(path)s\"."), context={"path": php_bin_path} ) # [attention] compatibility check may raise WpCliUnsupportedException exception wp_cli_compatibility_check(php_version) command_part = [command, subcommand, *args] full_command = [ WP_CLI, php_bin_path, path, *command_part ] environment = env or {} _logger.info('Executing command=%s', ' '.join(full_command)) try: output = run_in_cagefs_if_needed(full_command, check=True, env=environment) except subprocess.CalledProcessError as error: post_check_flag = 'Post-check failed' in error.stderr command = ' '.join(full_command) _logger.exception("Error during command execution: \n%s\n" "stdout=%s\n" "stderr=%s", command, error.stdout, error.stderr) logger_path = ADMIN_LOGFILE_PATH if not os.getuid() else USER_LOGFILE_PATH.format( homedir=pwd.getpwuid(os.getuid()).pw_dir) if post_check_flag: _process_post_check(error, logger_path) else: message = _("Unexpected error happened during command execution: '%(command)s'.\n" "Event is logged to file: '%(logger_path)s' with stdout and stderr recorded.") context = { "command": command, "logger_path": logger_path, "error_desc": error.stderr } return WordpressError( message=message, context=context, code=error.returncode ) _logger.info('Command=%s returned output=%s', ' '.join(full_command), str(output.stdout)) return output.stdout def _process_post_check( result: subprocess.CalledProcessError, log_path: str): """ In some rare cases plugin installation (CDN) may return post-check errors which we should handle properly. """ # case 1: plugin did not replace static links on website # with cdn urls for some reason cnd_not_active_msg = "CDN url not found on page" if cnd_not_active_msg in result.stderr: raise RollbackException(CDNActivationFailed(log_path)) # case 2: website returned error code during check # URL ' . $this->url . ' returned unexpected response status (' . $this->url_code . ‘)’ url_match = 'URL ' unexpected_error_match = 'returned unexpected response status' if url_match in result.stderr and unexpected_error_match in result.stderr: # "URL https://.../ ", " status (300)" url_part, code_part = result.stderr.split(unexpected_error_match) url = url_part.split()[1] code = code_part.strip("( )’'") raise RollbackException( WebsiteCheckBadHttpCode(url, code) ) # case 3: website itself works fine, but plugin was not able to reach static files # CDN url returned unexpected response status (' . $this->static_code . ’) unexpected_error_static_content = 'CDN url returned unexpected response status' if unexpected_error_static_content in result.stderr: # "URL https://.../ ", " status (300)" code_part = result.stderr.split('(')[1] code = code_part.strip("( )’'") raise RollbackException( JSCssCheckBadHttpCode(code) ) # case 4: unknown error message = _("WordPress plugin failed to activate correctly. " "Changes were reverted and caching module is now disabled.\n" "Error reported from plugin: \n" "%(error_desc)s.\n\n" "Event is logged to file: '%(logger_path)s' with stdout and stderr recorded.") context = { "command": result.cmd, "logger_path": log_path, "error_desc": result.stdout + result.stderr } raise RollbackException(WebsiteCheckError( header='Post check failed', description=message, context=context, fix_tip='', )) def is_multisite(path: str) -> bool: marker = 'cl_multisite_detected' command = 'if ( is_multisite() ) { echo "%s"; } else { echo "not_multisite"; }' % marker with wp_config_cache('is_multisite', path=path) as record: if record.data: return marker in record.data result = wordpress(path, 'eval', command, env={WP_CLI_SKIP_PLUGINS_ENV: '1'}) if isinstance(result, WordpressError): raise WposError(message=result.message, context=result.context) record.data = result return marker in result def wp_get_constant(wp_path: str, constant: str, raise_exception=False) -> Optional[str]: """ Get: - defined constant value - None in case of error - empty string if no such constant found """ command = "if (defined('%(const)s')) { echo %(const)s; }" % {'const': constant} with wp_config_cache('const.' + constant, path=wp_path) as record: if record.data: return record.data result = wordpress(wp_path, 'eval', command, env={WP_CLI_SKIP_PLUGINS_ENV: '1'}) if isinstance(result, WordpressError): if raise_exception: raise WpCliCommandError(message=result.message, context=result.context) _logger.error('Error during get WP constant: %s', result) return None record.data = result return result def diagnose_redis_connection_constants(docroot: str, wordpress_path: str): """ Check required constants for redis connection establishment """ redis_schema = wp_get_constant(os.path.join(docroot, wordpress_path), RedisRequiredConstants.WP_REDIS_SCHEME.name, raise_exception=True) if not redis_schema and redis_schema != RedisRequiredConstants.WP_REDIS_SCHEME.val: raise WposError('WordPress constant "%(constant)s" is not defined or defined with wrong value %(value)s', context={'constant': RedisRequiredConstants.WP_REDIS_SCHEME.name, 'value': redis_schema}) socket = wp_get_constant(os.path.join(docroot, wordpress_path), RedisRequiredConstants.WP_REDIS_PATH.name, raise_exception=True) if not socket: raise WposError('WordPress constant "%(constant)s" is not defined', context={'constant': RedisRequiredConstants.WP_REDIS_PATH.name}) if not os.path.exists(socket): raise WposError('Redis socket %(socket)s does not exist in the system', context={'socket': socket}) def obtain_wp_cli_env(): """ Returns needed envars for wp-cli """ server_software = 'Apache' server_software_extra = '' if litespeed_is_running(): server_software = 'LiteSpeed' elif nginx_is_running(): server_software = 'nginx' if apache2nginx_is_running(): server_software_extra = 'CloudLinux MAx Webserver' return { 'SERVER_SOFTWARE': server_software, 'SERVER_SOFTWARE_EXTRA': server_software_extra, } class PluginInfo(TypedDict): # e.g. "clsop", name: str # e.g. "active", status: str # e.g. "none", update: str # e.g. "3.12.6.1-1-2" version: str @cached_in_scope def _cached_plugin_list(abs_wp_path: str, wp_cli_env=None) -> List[PluginInfo]: if wp_cli_env: wp_cli_env = dict(wp_cli_env) wp_cli_env.update({WP_CLI_SKIP_PLUGINS_ENV: '1'}) else: wp_cli_env = {WP_CLI_SKIP_PLUGINS_ENV: '1'} result = wordpress(abs_wp_path, "plugin", "list", "--json", env=wp_cli_env) # TODO: raise exceptions instead of silent errors if isinstance(result, WordpressError): return [] # Try to find json, when output contains php errors def _maybe_json(output): _result = None pattern = r'(\[{.*?}\])' matches = re.search(pattern, output) if matches: try: json_string = matches.group(1) _result = json.loads(json_string) except Exception: pass return _result try: plugin_list_raw = json.loads(result) except (ValueError, TypeError, json.JSONDecodeError) as e: maybe_json_result = _maybe_json(result) if maybe_json_result is not None: plugin_list_raw = maybe_json_result else: raise WposError( message=_( 'Malformed plugins information received from wp-cli. ' 'Raw response is %(response)s.'), context={'response': result}, details=str(e) ) # https://cl.sentry.cloudlinux.com/organizations/cloudlinux_os/issues/163201/events/f59273162433432e86f2d83bdb4848d7/?project=22 if not isinstance(plugin_list_raw, list): raise WposError( message=_( 'Malformed plugin list received from wp-cli. ' 'Raw response is %(response)s.'), context={'response': result} ) for plugin_info in plugin_list_raw: if not isinstance(plugin_info, dict): raise WposError( message=_( 'Malformed plugin information received from wp-cli. ' 'Raw response is %(response)s.'), context={'response': result} ) return plugin_list_raw def plugin_list(abs_wp_path: str, wp_cli_env=None) -> List[PluginInfo]: return _cached_plugin_list( os.path.normpath(abs_wp_path), tuple(wp_cli_env.items()) if wp_cli_env else None ) def list_active_plugins(abs_wp_path: str, wp_cli_env=None): return [ item for item in plugin_list(abs_wp_path, wp_cli_env) if item['status'] == 'active' ] def get_plugin_data(wordpress_abs_path, plugin_name) -> List[PluginInfo]: plugins_data = plugin_list(abs_wp_path=wordpress_abs_path) if isinstance(plugins_data, WordpressError): raise WposError(message=plugins_data.message, context=plugins_data.context) return [item for item in plugins_data if item['name'] == plugin_name] def is_plugin_activated(abs_wp_path: str, plugin_name: str, wp_cli_env=None) -> bool: return any(item['name'] == plugin_name and item['status'] == 'active' for item in plugin_list(abs_wp_path, wp_cli_env)) def is_plugin_installed(abs_wp_path: str, plugin_name: str, wp_cli_env=None) -> bool: return any(item['name'] == plugin_name for item in plugin_list(abs_wp_path, wp_cli_env)) def update_constant(abs_wp_path: str, constant_name: str, constant_value=None) -> Optional[WordpressError]: if constant_value is None: # Remove constant result = wordpress(abs_wp_path, "config", "has", constant_name, "--type=constant", env={WP_CLI_SKIP_PLUGINS_ENV: '1'}) # Skip, because not zero return code means that constant doesn't exist if isinstance(result, WordpressError): result = None else: result = wordpress(abs_wp_path, "config", "delete", constant_name, "--type=constant", "--quiet", env={WP_CLI_SKIP_PLUGINS_ENV: '1'}) else: # Update/add constant. Try with our anchor first; if it fails (e.g., anchor is missing), # fallback to a plain insertion without anchor to ensure the constant is written. result = wordpress(abs_wp_path, "config", "set", constant_name, constant_value, "--raw", "--type=constant", '--anchor=// End of CloudLinux generated section', "--placement=before", "--quiet", env={WP_CLI_SKIP_PLUGINS_ENV: '1'}) if isinstance(result, WordpressError): # Fallback without anchor/placement in case the anchor doesn't exist or any other failure occurs result = wordpress(abs_wp_path, "config", "set", constant_name, constant_value, "--raw", "--type=constant", "--quiet", env={WP_CLI_SKIP_PLUGINS_ENV: '1'}) return result def update_redis_disable_banners_constant(abs_wp_path: str, constant_value=None) -> Optional[WordpressError]: return update_constant(abs_wp_path, RedisRequiredConstants.WP_REDIS_DISABLE_BANNERS.name, constant_value)
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply