如何为不同执行组动态分配独立日志文件(Loguru 实现)

8次阅读

如何为不同执行组动态分配独立日志文件(Loguru 实现)

使用 loguru 为数据处理流水线(如 store1_sales、store1_warehouses)创建隔离的日志文件,通过 bind() 绑定任务标识 + 自定义 Filter 实现按执行组自动路由日志到专属文件,并支持独立轮转策略。

在构建多管道数据处理系统(如 etl:extract → transform → load)时,将日志按业务维度(例如 “store1_sales”、”store2_warehouses”)物理隔离,不仅能提升故障排查效率,还能实现精细化运维——比如为高频流水线配置更激进的轮转(如每天),而低频任务保留更长日志周期(如30天)。Loguru 原生不提供“命名子 logger”,但可通过 Handler + Filter + bind 三者协同实现等效效果。

核心思路是:
✅ 为每个执行组预先注册一个专用 Handler(指向唯一日志文件 + 独立 rotation/retention);
✅ 使用 filter 函数精准拦截仅属于该组的日志记录(基于 record[“extra”] 中的绑定键值);
✅ 在各模块中通过 logger.bind(task=”xxx”) 创建轻量级上下文化 logger 实例,避免全局污染。

以下为可直接落地的实践方案:

1. 全局初始化(推荐在主入口或 config 模块中执行一次)

from loguru import logger  # 为每个 pipeline 注册独立 handler logger.add(     "logs/store1_sales.log",     level="INFO",     rotation="1 week",     retention="90 days",     compression="zip",     filter=lambda rec: rec["extra"].get("task") == "store1-sales",     serialize=True,     encoding="utf-8" )  logger.add(     "logs/store1_warehouses.log",     level="INFO",     rotation="30 days",     retention="180 days",     filter=lambda rec: rec["extra"].get("task") == "store1-warehouses",     serialize=True,     encoding="utf-8" )  logger.add(     "logs/store2_sales.log",     level="INFO",     rotation="1 day",  # 高频流水线启用日粒度轮转     retention="7 days",     filter=lambda rec: rec["extra"].get("task") == "store2-sales",     serialize=True,     encoding="utf-8" )

2. 各业务模块中按需绑定(解耦且线程安全)

# extract/store1/sales.py from loguru import logger  logger_store1_sales = logger.bind(task="store1-sales")  def store1_extract_sales():     logger_store1_sales.info("Started extracting sales data from API")     # ... processing logic     logger_store1_sales.success("Extraction completed, rows={count}", count=1247)
# transform/store1/sales.py from loguru import logger  logger_store1_sales = logger.bind(task="store1-sales")  # 复用相同 task 标识  def store1_convert_sales():     logger_store1_sales.debug("applying currency conversion and deduplication")     # ... transformation logic     logger_store1_sales.info("Converted {rows} records", rows=1247)
# load/store1/sales.py from loguru import logger  logger_store1_sales = logger.bind(task="store1-sales")  def store1_load_sales():     logger_store1_sales.info("Inserting into warehouse.sales table...")     # ... DB insertion     logger_store1_sales.success("Loaded {n} rows successfully", n=1247)

⚠️ 关键注意事项: filter 函数中务必使用 rec[“extra”].get(“task”) 而非 rec[“extra”][“task”],避免因未绑定导致 KeyError; 所有同属一个 pipeline 的模块必须使用完全一致的 task 字符串(如统一用 “store1-sales” 而非混用 “store1_sales” 或 “STORE1-SALES”); bind() 返回的是新 logger 实例,无状态共享,天然支持多线程/协程场景; 若需动态创建 handler(如 pipeline 名称来自配置),可用循环注册,但需确保 handler ID 不重复(Loguru 会自动分配,无需手动管理)。

最终,store1_sales.log 将只包含所有 logger.bind(task=”store1-sales”) 发出的日志,且按周轮转压缩;其他流水线日志互不干扰。这种设计既保持了代码的模块化与路径无关性(各 .py 文件可分散在任意目录),又实现了日志治理的强隔离性与高灵活性。

text=ZqhQzanResources