Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion salt/utils/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import salt.utils.yaml
import salt.version as version
from salt.defaults import DEFAULT_TARGET_DELIM
from salt.utils.validate.path import is_syslog_path
from salt.utils.validate.path import is_writeable
from salt.utils.verify import insecure_log, verify_log, verify_log_files

Expand Down Expand Up @@ -833,7 +834,7 @@ def __setup_logfile_logger_config(self):
),
)

if not is_writeable(logfile, check_parent=True):
if not is_syslog_path(logfile) and not is_writeable(logfile, check_parent=True):
# Since we're not be able to write to the log file or its parent
# directory (if the log file does not exit), are we the same user
# as the one defined in the configuration file?
Expand Down
60 changes: 60 additions & 0 deletions salt/utils/validate/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,67 @@
Several path related validators
"""

import logging.handlers
import os
import pathlib
import stat
from urllib.parse import urlparse


def is_syslog_path(path):
"""
Check if a given path is a syslog path.

<file|udp|tcp>://<host|socketpath>:<port-if-required>/<log-facility>

log-facility is optional; must be a valid syslog facility in upper case (e.g. "LOG_USER", "LOG_DAEMON", etc.)
see https://docs.python.org/3/library/logging.handlers.html#logging.handlers.SysLogHandler

:param path: The path to check
:returns: True or False
"""

parsed_log_path = urlparse(path)

if parsed_log_path.scheme in ("tcp", "udp", "file"):
if parsed_log_path.scheme == "file" and parsed_log_path.path:
path = pathlib.Path(parsed_log_path.path)
facility_name = path.name.upper()
if facility_name.startswith("LOG_"):
facility = getattr(logging.handlers.SysLogHandler, facility_name, None)
if facility is None:
return False
file_path = str(path.resolve().parent)
else:
file_path = str(path.resolve())

# Check if file_path is a Unix socket
if os.path.exists(file_path):
mode = os.stat(file_path).st_mode
if stat.S_ISSOCK(mode):
return os.access(file_path, os.W_OK)
else:
return False
else:
return False

elif parsed_log_path.path:
# In case of udp or tcp with a facility specified
path = pathlib.Path(parsed_log_path.path)
facility_name = path.stem.upper()
if not facility_name.startswith("LOG_"):
return False

facility = getattr(logging.handlers.SysLogHandler, facility_name, None)
if facility is None:
return False
else:
return True

else:
# This is the case of udp or tcp without a facility specified
return True
return False


def is_writeable(path, check_parent=False):
Expand Down