ACIL FM
Dark
Refresh
Current DIR:
/usr/lib/python3.9/site-packages/dnf-plugins
/
usr
lib
python3.9
site-packages
dnf-plugins
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
builddep.py
9.13 MB
chmod
View
DL
Edit
Rename
Delete
changelog.py
4.85 MB
chmod
View
DL
Edit
Rename
Delete
config_manager.py
10.63 MB
chmod
View
DL
Edit
Rename
Delete
copr.py
29.59 MB
chmod
View
DL
Edit
Rename
Delete
debug.py
12.26 MB
chmod
View
DL
Edit
Rename
Delete
debuginfo-install.py
10.82 MB
chmod
View
DL
Edit
Rename
Delete
download.py
12.04 MB
chmod
View
DL
Edit
Rename
Delete
generate_completion_cache.py
3.86 MB
chmod
View
DL
Edit
Rename
Delete
groups_manager.py
13.21 MB
chmod
View
DL
Edit
Rename
Delete
kpatch.py
12.75 MB
chmod
View
DL
Edit
Rename
Delete
needs_restarting.py
13.52 MB
chmod
View
DL
Edit
Rename
Delete
notify_packagekit.py
1.5 MB
chmod
View
DL
Edit
Rename
Delete
repoclosure.py
6.89 MB
chmod
View
DL
Edit
Rename
Delete
repodiff.py
11.21 MB
chmod
View
DL
Edit
Rename
Delete
repograph.py
4 MB
chmod
View
DL
Edit
Rename
Delete
repomanage.py
10.32 MB
chmod
View
DL
Edit
Rename
Delete
reposync.py
14.67 MB
chmod
View
DL
Edit
Rename
Delete
spacewalk.py
13.88 MB
chmod
View
DL
Edit
Rename
Delete
system_upgrade.py
26.88 MB
chmod
View
DL
Edit
Rename
Delete
universal_hooks.py
5.78 MB
chmod
View
DL
Edit
Rename
Delete
Edit file: /usr/lib/python3.9/site-packages/dnf-plugins/spacewalk.py
# # Copyright (C) 2015 Red Hat, Inc. # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # the GNU General Public License v.2, or (at your option) any later version. # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY expressed or implied, including the implied warranties of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. You should have received a copy of the # GNU General Public License along with this program; if not, write to the # Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. Any Red Hat trademarks that are incorporated in the # source code or documentation are not subject to the GNU General Public # License and may only be used or replicated with the express permission of # Red Hat, Inc. # from __future__ import absolute_import from __future__ import unicode_literals from dnfpluginscore import _, logger import dnf import dnf.exceptions import errno import json import librepo import os from copy import copy from dnf.conf.config import PRIO_PLUGINCONFIG import up2date_client.up2dateAuth import up2date_client.config import up2date_client.rhnChannel import up2date_client.rhnPackageInfo from rhn.i18n import ustr from up2date_client import up2dateErrors STORED_CHANNELS_NAME = '_spacewalk.json' RHN_DISABLED = _("CloudLinux Network based repositories will be disabled.") CHANNELS_DISABLED = _("CloudLinux Network channel support will be disabled.") COMMUNICATION_ERROR = _("There was an error communicating with CloudLinux Network server.") NOT_REGISTERED_ERROR = _("This system is not registered with CloudLinux Network server.") NOT_SUBSCRIBED_ERROR = _("This system is not subscribed to any channels.") NO_SYSTEM_ID_ERROR = _("SystemId could not be acquired.") USE_RHNREGISTER = _("You can use rhn_register to register.") UPDATES_FROM_SPACEWALK = _("This system is receiving updates from CloudLinux Network server.") GPG_KEY_REJECTED = _("For security reasons packages from CloudLinux Network based repositories can be verified only with locally installed gpg keys. GPG key '%s' has been rejected.") PROFILE_NOT_SENT = _("Package profile information could not be sent.") MISSING_HEADER = _("Missing required login information for CloudLinux Network: %s") LEAPP_IN_PROGRESS = _("Leapp upgrade is running - using cache.") MUST_BE_ROOT = _('Spacewalk plugin has to be run under with the root privileges.') class Spacewalk(dnf.Plugin): name = 'spacewalk' def __init__(self, base, cli): super(Spacewalk, self).__init__(base, cli) self.base = base self.cli = cli self.stored_channels_path = os.path.join(self.base.conf.persistdir, STORED_CHANNELS_NAME) self.connected_to_spacewalk = False self.up2date_cfg = {} self.conf = copy(self.base.conf) self.parser = self.read_config(self.conf) if "main" in self.parser.sections(): options = self.parser.items("main") for (key, value) in options: self.conf._set_value(key, value, PRIO_PLUGINCONFIG) if not dnf.util.am_i_root(): logger.warning(MUST_BE_ROOT) self.conf.enabled = False if not self.conf.enabled: return logger.debug('initialized Spacewalk plugin') self.activate_channels() def config(self): if not self.conf.enabled: return # cli is None when plugin is executed # though automatic actions or other pluings # like in case of leapp which uses custom dnf plugin if not self.cli: return self.cli.demands.root_user = True def clnreg(self): os.system('/usr/sbin/clnreg_ks --strict-edition') def activate_channels(self, networking=True): enabled_channels = {} sslcacert = None force_http = 0 proxy_url = None login_info = None cached_channels = self._read_channels_file() if not networking: # no network communication, use list of channels from persistdir enabled_channels = cached_channels elif os.path.isfile("/etc/cln_leapp_in_progress"): # networking is true, but CLN urls won't be accessible, use cache logger.warning(LEAPP_IN_PROGRESS) enabled_channels = cached_channels else: # setup proxy according to up2date self.up2date_cfg = up2date_client.config.initUp2dateConfig() sslcacert = get_ssl_ca_cert(self.up2date_cfg) force_http = self.up2date_cfg['useNoSSLForPackages'], # trying to register system once in case of error while getLoginInfo clnreg_tried = False while not clnreg_tried: try: login_info = up2date_client.up2dateAuth.getLoginInfo(timeout=self.conf.timeout) clnreg_tried = True except up2dateErrors.RhnServerException as e: if clnreg_tried == False: self.clnreg() clnreg_tried = True continue logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, RHN_DISABLED, e) return if not login_info: logger.error("%s\n%s", NOT_REGISTERED_ERROR, RHN_DISABLED) self._write_channels_file({}) return try: svrChannels = up2date_client.rhnChannel.getChannelDetails( timeout=self.conf.timeout) except up2dateErrors.CommunicationError as e: logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, RHN_DISABLED, e) return except up2dateErrors.NoChannelsError: logger.error("%s\n%s", NOT_SUBSCRIBED_ERROR, CHANNELS_DISABLED) self._write_channels_file({}) return except up2dateErrors.NoSystemIdError: logger.error("%s %s\n%s\n%s", NOT_SUBSCRIBED_ERROR, NO_SYSTEM_ID_ERROR, USE_RHNREGISTER, RHN_DISABLED) return self.connected_to_spacewalk = True logger.info(UPDATES_FROM_SPACEWALK) for channel in svrChannels: if channel['version']: enabled_channels[channel['label']] = dict(channel.items()) self._write_channels_file(enabled_channels) repos = self.base.repos for (channel_id, channel_dict) in enabled_channels.items(): cached_channel = cached_channels.get(channel_id) cached_version = None if cached_channel: cached_version = cached_channel.get('version') conf = copy(self.conf) if channel_id in self.parser.sections(): options = self.parser.items(channel_id) for (key, value) in options: conf._set_value(key, value, PRIO_PLUGINCONFIG) repo = SpacewalkRepo(channel_dict, { 'conf' : self.base.conf, 'proxy' : proxy_url, 'timeout' : conf.timeout, 'sslcacert' : sslcacert, 'force_http': force_http, 'cached_version' : cached_version, 'login_info': login_info, 'gpgcheck': conf.gpgcheck, 'enabled': conf.enabled, }) repos.add(repo) # DEBUG logger.debug(enabled_channels) def transaction(self): """ Update system's profile after transaction. """ if not self.conf.enabled: return if not self.connected_to_spacewalk: # not connected so nothing to do here return if self.up2date_cfg['writeChangesToLog'] == 1: delta = self._make_package_delta() up2date_client.rhnPackageInfo.logDeltaPackages(delta) try: up2date_client.rhnPackageInfo.updatePackageProfile( timeout=self.conf.timeout) except up2dateErrors.RhnServerException as e: logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, PROFILE_NOT_SENT, e) def _read_channels_file(self): try: with open(self.stored_channels_path, "r") as channels_file: content = channels_file.read() channels = json.loads(content) return channels except (FileNotFoundError, IOError) as e: if e.errno != errno.ENOENT: raise except json.decoder.JSONDecodeError as e: pass # ignore broken json and recreate it later return {} def _write_channels_file(self, var): try: with open(self.stored_channels_path, "w") as channels_file: json.dump(var, channels_file, indent=4) except (FileNotFoundError, IOError) as e: if e.errno != errno.ENOENT: raise def _make_package_delta(self): delta = {'added' : [(p.name, p.version, p. release, p.epoch, p.arch) for p in self.base.transaction.install_set], 'removed': [(p.name, p.version, p. release, p.epoch, p.arch) for p in self.base.transaction.remove_set], } return delta class SpacewalkRepo(dnf.repo.Repo): """ Repository object for Spacewalk. Uses up2date libraries. """ needed_headers = ['X-RHN-Server-Id', 'X-RHN-Auth-User-Id', 'X-RHN-Auth', 'X-RHN-Auth-Server-Time', 'X-RHN-Auth-Expire-Offset'] def __init__(self, channel, opts): super(SpacewalkRepo, self).__init__(ustr(channel['label']), opts.get('conf')) # dnf stuff self.name = ustr(channel['name']) self.baseurl = [ url + '/GET-REQ/' + self.id for url in channel['url']] self.sslcacert = opts.get('sslcacert') self.proxy = opts.get('proxy') try: self.gpgkey = get_gpg_key_urls(channel['gpg_key_url']) except InvalidGpgKeyLocation as e: logger.warning(GPG_KEY_REJECTED, dnf.i18n.ucd(e)) self.gpgkey = [] if channel['version'] != opts.get('cached_version'): self.metadata_expire = 1 # spacewalk stuff self.login_info = opts.get('login_info') self.keepalive = 0 self.bandwidth = 0 self.retries = 1 self.throttle = 0 self.timeout = opts.get('timeout') self.gpgcheck = opts.get('gpgcheck') self.force_http = opts.get('force_http') if opts.get('enabled'): self.enable() else: self.disable() if hasattr(self, 'set_http_headers'): # dnf > 4.0.9 on RHEL 8, Fedora 29/30 http_headers = self.create_http_headers() if http_headers: self.set_http_headers(http_headers) def create_http_headers(self): http_headers = [] if not self.login_info: return http_headers for header in self.needed_headers: if not header in self.login_info: error = MISSING_HEADER % header raise dnf.Error.RepoError(error) if self.login_info[header] in (None, ''): # This doesn't work due to bug in librepo (or even deeper in libcurl) # the workaround bellow can be removed once BZ#1211662 is fixed #http_headers.append("%s;" % header) http_headers.append("%s: \r\nX-libcurl-Empty-Header-Workaround: *" % header) else: http_headers.append("%s: %s" % (header, self.login_info[header])) if not self.force_http: http_headers.append("X-RHN-Transport-Capability: follow-redirects=3") return http_headers def _handle_new_remote(self, destdir, mirror_setup=True): # this function is called only on dnf < 3.6.0 (up to Fedora 29) handle = super(SpacewalkRepo, self)._handle_new_remote(destdir, mirror_setup) http_headers = self.create_http_headers() if http_headers: handle.setopt(librepo.LRO_HTTPHEADER, http_headers) return handle # FIXME # all rutines bellow should go to rhn-client-tools because they are share # between yum-rhn-plugin and dnf-plugin-spacewalk def get_gpg_key_urls(key_url_string): """ Parse the key urls and validate them. key_url_string is a space seperated list of gpg key urls that must be located in /etc/pkg/rpm-gpg/. Return a list of strings containing the key urls. Raises InvalidGpgKeyLocation if any of the key urls are invalid. """ key_urls = key_url_string.split() for key_url in key_urls: if not is_valid_gpg_key_url(key_url): raise InvalidGpgKeyLocation(key_url) return key_urls class InvalidGpgKeyLocation(Exception): pass def is_valid_gpg_key_url(key_url): proto_split = key_url.split('://') if len(proto_split) != 2: return False proto, path = proto_split if proto.lower() != 'file': return False path = os.path.normpath(path) if not path.startswith('/etc/pki/rpm-gpg/'): return False return True def get_ssl_ca_cert(up2date_cfg): if not ('sslCACert' in up2date_cfg and up2date_cfg['sslCACert']): raise BadSslCaCertConfig ca_certs = up2date_cfg['sslCACert'] if type(ca_certs) == list: return ca_certs[0] return ca_certs
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply