Просмотр исходного кода

Upgrade to journald's native journal protocol

Unrud 2 лет назад
Родитель
Сommit
9276c65462
1 измененных файлов с 71 добавлено и 1 удалено
  1. 71 1
      radicale/log.py

+ 71 - 1
radicale/log.py

@@ -25,11 +25,16 @@ Log messages are sent to the first available target of:
 
 """
 
+import contextlib
 import logging
 import os
+import socket
+import struct
 import sys
 import threading
-from typing import Any, Callable, ClassVar, Dict, Iterator, Union
+import time
+from typing import (Any, Callable, ClassVar, Dict, Iterator, Optional, Tuple,
+                    Union)
 
 from radicale import types
 
@@ -65,6 +70,9 @@ class IdentLogRecordFactory:
         if current_thread.name and main_thread != current_thread:
             ident += "/%s" % current_thread.name
         record.ident = ident  # type:ignore[attr-defined]
+        record.tid = None  # type:ignore[attr-defined]
+        if sys.version_info >= (3, 8):
+            record.tid = current_thread.native_id
         return record
 
 
@@ -75,14 +83,76 @@ class ThreadedStreamHandler(logging.Handler):
     terminator: ClassVar[str] = "\n"
 
     _streams: Dict[int, types.ErrorStream]
+    _journal_stream_id: Optional[Tuple[int, int]]
+    _journal_socket: Optional[socket.socket]
 
     def __init__(self) -> None:
         super().__init__()
         self._streams = {}
+        self._journal_stream_id = None
+        with contextlib.suppress(TypeError, ValueError):
+            dev, inode = os.environ.get("JOURNAL_STREAM", "").split(":", 1)
+            self._journal_stream_id = int(dev), int(inode)
+        self._journal_socket = None
+        if self._journal_stream_id and hasattr(socket, "AF_UNIX"):
+            journal_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+            try:
+                journal_socket.connect("/run/systemd/journal/socket")
+            except OSError:
+                journal_socket.close()
+            else:
+                self._journal_socket = journal_socket
+
+    def _detect_journal(self, stream):
+        if not self._journal_stream_id:
+            return False
+        try:
+            stat = os.fstat(stream.fileno())
+        except Exception:
+            return False
+        return self._journal_stream_id == (stat.st_dev, stat.st_ino)
+
+    @staticmethod
+    def _encode_journal(data):
+        msg = b""
+        for key, value in data.items():
+            if key is None:
+                continue
+            key = key.encode()
+            value = str(value).encode()
+            if b"\n" in value:
+                msg += (key + b"\n" +
+                        struct.pack("<Q", len(value)) + value + b"\n")
+            else:
+                msg += key + b"=" + value + b"\n"
+        return msg
+
+    def _emit_journal(self, record):
+        priority = {"DEBUG": 7,
+                    "INFO": 6,
+                    "WARNING": 4,
+                    "ERROR": 3,
+                    "CRITICAL": 2}.get(record.levelname, 4)
+        timestamp = time.strftime("%Y-%m-%dT%H:%M:%S.%%03dZ",
+                                  time.gmtime(record.created)) % record.msecs
+        data = {"PRIORITY": priority,
+                "TID": record.tid,
+                "SYSLOG_IDENTIFIER": record.name,
+                "SYSLOG_FACILITY": 1,
+                "SYSLOG_PID": record.process,
+                "SYSLOG_TIMESTAMP": timestamp,
+                "CODE_FILE": record.pathname,
+                "CODE_LINE": record.lineno,
+                "CODE_FUNC": record.funcName,
+                "MESSAGE": self.format(record)}
+        self._journal_socket.sendall(self._encode_journal(data))
 
     def emit(self, record: logging.LogRecord) -> None:
         try:
             stream = self._streams.get(threading.get_ident(), sys.stderr)
+            if self._journal_socket and self._detect_journal(stream):
+                self._emit_journal(record)
+                return
             msg = self.format(record)
             stream.write(msg)
             stream.write(self.terminator)