5
\$\begingroup\$

Today, I've got a small FlowLogger for you to review. The idea is not to log pure messages, but focus on the flow of the app that might contain such items as:

  • state - json-snapshot of variables etc
  • event - something worth logging has occured
  • started - the flow has started
  • altered - the flow has been altered by some condition like an if
  • canceled - the flow has been canceled before it run to completion
  • faulted - the flow experienced an exception
  • completed - the flow has succesfuly run to completion

Implementation

The APIs are implemented by the FlowLogger that is build on top of the standard logging without exposing the original methods:

import json
import logging
import functools
import pyodbc
from typing import Union, List
from timeit import default_timer as timer
from datetime import datetime
from logging import Handler

class FlowLogger:

    def __init__(self, name: str, handlers: List[logging.Handler]):
        self._logger = logging.Logger(name)
        for h in handlers:
            self._logger.addHandler(h)

    def state(self, **kwargs):
        self._logger.info(json.dumps(kwargs, sort_keys=True), (), **dict(extra={"flow": FlowLogger.state.__name__}))

    def event(self, name: str):
        self._logger.info(name, (), **dict(extra={"flow": FlowLogger.event().__name__}))

    def started(self, name: Union[str | None] = None, **kwargs):
        self._logger.info(name, **dict(extra=dict(**kwargs, **{"flow": FlowLogger.started.__name__})))

    def altered(self, reason: str):
        self._logger.info(reason, **dict(extra={"flow": FlowLogger.altered.__name__}))

    def canceled(self, reason: Union[str | None]):
        self._logger.warning(reason, **dict(extra={"flow": FlowLogger.canceled.__name__}))

    def faulted(self, **kwargs):
        self._logger.exception(None, extra=dict(**kwargs, **{"flow": FlowLogger.faulted.__name__}))

    def completed(self, **kwargs):
        self._logger.info(None, **dict(extra=dict(**kwargs, **{"flow": FlowLogger.completed.__name__})))


In order to not repeat myself when logging the flow of entire methods the telemetry decorator takes over:

def telemetry(flow: FlowLogger):
    def decorate(decoratee):
        @functools.wraps(decoratee)
        def wrapper(*decoratee_args, **decoratee_kwargs):
            with FlowScope(flow, decoratee.__name__):
                return decoratee(*decoratee_args, **decoratee_kwargs)
        return wrapper
    return decorate

It's supported by the FlowScope that handles the time measurement and exception handling:

class FlowScope:
    def __init__(self, flow: FlowLogger, api: str):
        self.flow = flow
        self.api = api
        self.start = timer()
        pass

    def __enter__(self):
        flow.started(api=self.api)

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            flow.faulted(api=self.api, elapsed=timer() - self.start)
        else:
            flow.completed(api=self.api, elapsed=timer() - self.start)


The output is handled by the SqlServerHandler:

    create table log(
        [id] int identity(1,1) primary key,
        [timestamp] datetime2,
        [logger] nvarchar(50),
        [module] nvarchar(200),
        [api] nvarchar(200),
        [flow] nvarchar(50),
        [elapsed] float,
        [message] nvarchar(max),
        [line] int
    )

class SqlServerHandler(Handler):
    def __init__(self, server: str, database: str, username: str, password: str):
        super().__init__(logging.INFO)
        connection = pyodbc.connect(f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}")
        self.db = connection.cursor()

    def emit(self, record):
        if record.exc_info:
            record.exc_text = logging.Formatter().formatException(record.exc_info)

        self.db.execute(
            "INSERT INTO log([timestamp], [logger], [module], [api], [flow], [elapsed], [message], [line]) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
            datetime.fromtimestamp(record.created),
            record.name,
            record.module,
            record.api if hasattr(record, "api") else record.funcName,
            record.flow if hasattr(record, "flow") else None,
            record.elapsed if hasattr(record, "elapsed") else None,
            record.exc_text if record.exc_text else record.msg,
            record.lineno
        )
        self.db.commit()


Example

This is my dirty code for how I'm testing it:


def initialize_telemetry() -> FlowLogger:
    stream_handler = logging.StreamHandler()
    stream_handler.setFormatter(logging.Formatter(style="{", fmt="{asctime} | {module}.{funcName} | {flow} | {message}", defaults={"flow": "<flow>", "message": "<message>"}))
    sql_server_handler = SqlServerHandler(server="localhost,1433", database="master", username="sa", password="***")

    flow = FlowLogger("test-logger", [stream_handler, sql_server_handler])

    try:
        return flow
    finally:
        flow.completed()


flow = initialize_telemetry()


@telemetry(flow)
def flow_decorator_test():
    raise ValueError
    pass


def flow_test():
    flow.started("Custom flow.")
    flow.state(foo="bar")

    if True:
        flow.altered("The value was true.")

    try:
        raise ValueError
    except:
        flow.faulted()

    flow.completed()


if __name__ == "__main__":
    flow_test()
    flow_decorator_test()

What do you think? Would say I'm doing something terribly wrong here?

\$\endgroup\$
1
  • \$\begingroup\$ There is a new version, a follow-up of this question here. \$\endgroup\$ Commented Oct 25, 2023 at 10:43

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.