ContentMemory v8.0 四层仿生认知记忆引擎开源文档
整合成品:README.md + 完整源码一体化开源文档
开源声明前置:本项目代码全量永久全民公开,任何人、机构不得针对本套四层认知记忆系统、自生长知识树、脉冲联动记忆架构申请发明专利、实用新型专利、软著独占专利;代码可无偿商用、修改、分发、二次衍生。
项目:ContentMemory v8.0|曈曈v8.0脉冲场类脑架构|开源协议:MIT+开源防专利附加条款|日期:2026-06-06
一、README.md(直接保存为项目根目录README.md)
Markdown

ContentMemory v8.0 四层仿生认知记忆引擎

基于脉冲场事件驱动架构 | Python3.8+ | 永久全民开源·禁止专利申报

重要开源法律附加约定(优先级高于MIT协议)

  1. 本项目全部源代码、架构思路、算法逻辑全民永久无偿公开,全球任意个人、企业可免费使用、商用、二次开发、闭源集成、分发复刻。
  2. 任何组织、个人不得以本项目:四层分级记忆架构、自生长知识树算法、自动分类逻辑、突触横向关联机制、记忆代谢淘汰算法、事件预测预加载、脉冲联动记忆方案单独或组合申请发明专利、实用新型专利、软件著作专利、技术专利;一经发现专利申请,所有相关专利天然无效,全体开源使用者可无偿举证作废。
  3. 允许衍生项目开源/闭源,但衍生项目不得将原生架构申请专利。

项目简介

ContentMemory 仿生四层人脑认知记忆引擎,模拟人脑工作记忆/短期记忆/长期记忆/本能种子记忆,配套自生长知识树、自动分类、横向联想关联、频率共鸣检索、时序事件预测预加载,原生适配脉冲场类脑智能框架,可用于AI智能体长期记忆、轻量化RAG知识库、机器人大脑、大模型对话记忆管理。

核心特性

  • ✅ L1热/L2温/L3冷/L4种子 四层分级记忆,S/A/B三级记忆留存权重
  • ✅ 自生长多层级知识树:自动归类内容、自动创建全新分类分支、动态扩充关键词库
  • ✅ 突触式横向双向知识关联,检索自动联动关联知识库
  • ✅ 四合一检索:分类定向检索+全文检索+频率共振检索+赫布突触关联检索
  • ✅ 后台异步落盘+定时记忆代谢+事件预测主动预缓存
  • ✅ 全量数据持久化:知识树、热点缓存JSON落地,重启自动加载恢复
  • ✅ 线程安全,读写分离,后台业务线程不阻塞主进程

目录说明

nucleus/
└─mnemosyne/
└─content_memory.py # 主源码文件
data/ # 自动生成持久化目录
├─knowledge_tree.json
└─hot_cache.json
Plaintext

快速使用

from nucleus.mnemosyne.content_memory import ContentMemory
# 最简初始化
mem = ContentMemory(organ_name="demo_brain", hot_cache_size=200)

# 自动分类写入记忆
mem.hot_write("gpu_1050ti", "GTX1050Ti 4G DDR5独立显卡")
# 手动指定分类写入
mem.hot_write("crispr_01", "CRISPR基因编辑技术原理", category="生物工程/基因技术")
# S级永久种子记忆(永不遗忘)
mem.seed_set("agent_id", "曈曈v8类脑智能体")

# 分类检索
res = mem.search_by_category("显卡", limit=3)
print(res)

# 优雅退出自动保存知识树、热点缓存
mem.stop()
API参考
详见源码内注释,内置main自测代码,直接运行py文件自动全量自测。
迭代规划
v8.1:LLM语义自动分类优化
v8.2:Flask知识树可视化面板
v8.3:大库分片存储优化
Plaintext

## 二、content_memory.py(在你原有代码最顶部追加开源防专利声明,完整源码如下,直接全量替换保存)
```python
"""
nucleus/mnemosyne/content_memory.py — v8.0 脉冲场架构 · 认知记忆层核心
==========================================================================
【全民永久开源附加条款(强制生效,不可删除)】
1. 本代码、架构方案、自生长知识树算法、四层记忆分层设计、横向关联逻辑、记忆代谢策略全部内容全民无偿公开,永久开放。
2. 任何自然人、法人、企事业单位、科研机构严禁针对本文件包含的全部/局部技术方案申请任何类型专利(发明专利、实用新型、外观专利、软件知识产权专利),所有违规申报专利自申请之日起视作无效。
3. 使用者拥有无限制使用、商用、修改、分发、二次开发、闭源集成权利,无需原作者授权,无版权使用费。
开源主协议:MIT License
版本:v8.0
创建日期:2026年5月28日
升级日期:2026年6月6日

四层认知记忆系统 + 自生长多层级知识树 + 横向关联 + 频率共鸣加速

v8.0 知识系统升级(2026-06-06):
  - 自生长知识树:新知识自动归纳,未知领域自动创建分支
  - 横向关联:存入时扫描已有知识,自动建立跨分支链接
  - 分类检索:先定位分支再精确匹配,大幅提升检索精度
  - 记忆分级:S=永不遗忘, A=长期保留, B=正常代谢
  - 树状结构持久化:重启后完整恢复分类体系

核心类:
  ContentMemory — 四层认知记忆系统 + 自生长知识树
==========================================================================
"""

import sys
import os
import time
import json
import threading
import re
from collections import OrderedDict, deque
from typing import Any, Optional, Callable

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))


class ContentMemory:
    """四层认知记忆系统 + 自生长多层级知识树。"""

    def __init__(self, organ_name: str, info_field=None, db_store=None, 
                 hot_cache_size: int = 100, frequency_codec=None, hebbian_learner=None):
        self.organ_name = organ_name
        self.field = info_field
        self.db = db_store
        self.frequency_codec = frequency_codec
        self.hebbian_learner = hebbian_learner

        # 热记忆
        self._hot_cache: OrderedDict = OrderedDict()
        self._hot_cache_max = hot_cache_size
        self._hot_access_times: dict[str, float] = {}
        self._hot_lock = threading.Lock()

        # 冷记忆
        self._write_queue: deque = deque(maxlen=1000)
        self._write_thread: Optional[threading.Thread] = None
        self._write_running = False

        # 种子记忆
        self._seed_memory: dict = {}

        # 预测缓存
        self._predictive_queue: deque = deque(maxlen=50)
        self._event_sequence: deque = deque(maxlen=200)
        self._transition_probs: dict = {}

        # ========== 🆕 自生长多层级知识分类树 ==========
        self._knowledge_tree: dict = {}          # 树状结构
        self._category_index: dict[str, list] = {}  # 分类路径 → 知识键列表
        self._cross_links: dict[str, list] = {}     # 横向关联:知识键 → 关联知识键列表
        self._tree_path = os.path.join(
            os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
            "data", "knowledge_tree.json"
        )
        self._init_knowledge_tree()

        # ========== 🆕 自动分类关键词库(动态扩展) ==========
        self._category_keywords: dict[str, list] = {
            "框架设计/脉冲场架构": ["事件脉冲", "脉冲场", "频率编码", "脉冲发生器", "不应期", "噪声过滤"],
            "框架设计/信息场": ["信息场", "场域阻尼", "状态平滑", "订阅熔断", "梯度感知"],
            "框架设计/存算一体": ["存算一体", "器官持久化", "本地记忆", "去中心化"],
            "框架设计/硬件容错": ["硬件容错", "梯度降级", "熔断", "自愈", "故障预测"],
            "小林/个人信息": ["小林", "任桂林", "预算员", "一建", "备考"],
            "小林/偏好": ["咖啡", "喜欢", "偏好"],
            "小林/家人": ["路灯", "哥哥", "小曈曈", "女儿"],
            "硬件知识/GPU": ["GPU", "显存", "1050", "nvidia", "显卡"],
            "硬件知识/CPU": ["CPU", "Ryzen", "Intel", "处理器", "核心"],
            "硬件知识/内存": ["内存", "RAM", "DDR", "48GB"],
            "航天工程/推进系统": ["火箭", "推进", "比冲", "发动机"],
            "航天工程/轨道力学": ["轨道", "轨道力学", "航天器轨道"],
            "航天工程/航天器设计": ["航天器", "飞船", "卫星"],
            "计算机科学/编程语言": ["Python", "代码", "编程", "函数", "算法", "排序"],
            "计算机科学/数据库": ["数据库", "SQL", "DuckDB", "查询", "索引"],
            "计算机科学/AI": ["机器学习", "神经网络", "深度学习", "模型训练"],
            "对话经验/问候": ["问候", "早安", "晚安", "你好", "嗨"],
            "对话经验/情绪回应": ["情绪", "开心", "难过", "焦虑", "生气", "累"],
            "外部知识/GitHub": ["GitHub", "仓库", "开源", "Star"],
            "外部知识/StackOverflow": ["StackOverflow", "问答", "编程问题"],
        }

        # 统计
        self._stats = {
            "热命中": 0, "热未命中": 0, "温命中": 0, "冷读取": 0,
            "预测命中": 0, "总查询": 0, "代谢迁移次数": 0,
            "分类命中": 0, "新增分支": 0, "横向链接数": 0,
        }
        self._hot_ttl_seconds = 600
        self._warm_ttl_seconds = 3600
        self._metabolism_interval = 300
        self._metabolism_running = False
        self._metabolism_thread: Optional[threading.Thread] = None
        self._start_metabolism()

        # 加载热记忆缓存
        self._hot_cache_path = os.path.join(
            os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
            "data", "hot_cache.json"
        )
        try:
            if os.path.exists(self._hot_cache_path):
                with open(self._hot_cache_path, 'r', encoding='utf-8') as f:
                    cached = json.load(f)
                if isinstance(cached, dict):
                    now = time.time()
                    with self._hot_lock:
                        for k, v in cached.items():
                            if len(self._hot_cache) < self._hot_cache_max:
                                self._hot_cache[k] = v
                                self._hot_access_times[k] = now
                print(f"📦 从磁盘加载了 {len(cached)} 条热记忆缓存")
        except Exception:
            pass

        if self.field:
            self.field.subscribe_pattern(r"knowledge\..*", self._on_knowledge_event)
        self._auto_activate_events = [
            "memory_tight", "gpu_overload", "gpu_mem_critical",
            "low_memory_critical", "touch.alert", "resonance.formed"
        ]
        for ev in self._auto_activate_events:
            if self.field:
                self.field.subscribe(ev, self._on_auto_activate_pulse)

    # ========== 🆕 知识树初始化与持久化 ==========

    def _init_knowledge_tree(self):
        """初始化知识树,优先从磁盘加载,否则创建空树。"""
        try:
            if os.path.exists(self._tree_path):
                with open(self._tree_path, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                self._knowledge_tree = data.get("tree", {})
                self._category_index = data.get("index", {})
                self._category_keywords = data.get("keywords", self._category_keywords)
                self._cross_links = data.get("cross_links", {})
                tree_size = self._count_tree_nodes(self._knowledge_tree)
                print(f"🌳 从磁盘恢复了知识树:{tree_size}个节点,{len(self._category_index)}个分类索引")
                return
        except Exception:
            pass
        # 磁盘无数据,创建空树
        self._knowledge_tree = {}

    def _count_tree_nodes(self, tree: dict) -> int:
        """递归统计树节点数。"""
        count = 0
        for v in tree.values():
            count += 1
            if isinstance(v, dict):
                count += self._count_tree_nodes(v)
        return count

    def _save_knowledge_tree(self):
        """保存知识树到磁盘。"""
        try:
            os.makedirs(os.path.dirname(self._tree_path), exist_ok=True)
            with open(self._tree_path, 'w', encoding='utf-8') as f:
                json.dump({
                    "tree": self._knowledge_tree,
                    "index": self._category_index,
                    "keywords": self._category_keywords,
                    "cross_links": self._cross_links,
                    "timestamp": time.time(),
                }, f, ensure_ascii=False, indent=2)
        except Exception:
            pass

    def _ensure_branch_exists(self, category_path: str):
        """确保树中存在指定路径的所有分支节点,不存在则自动创建。"""
        parts = [p.strip() for p in category_path.split("/") if p.strip()]
        if not parts:
            return
        current = self._knowledge_tree
        for part in parts:
            if part not in current:
                current[part] = {}
                self._stats["新增分支"] += 1
                print(f"🌱 知识树自动生长:新分支「{part}」已创建(父路径:{'/'.join(parts[:parts.index(part)])})")
            current = current[part]

    # ========== 🆕 智能自动分类 ==========

    def _extract_keywords(self, text: str, max_kw: int = 5) -> list:
        """从文本中提取关键词(中文2-4字词 + 英文单词)。"""
        text = str(text)[:500]
        # 中文关键词
        chinese_words = re.findall(r'[\u4e00-\u9fff]{2,4}', text)
        # 英文关键词
        english_words = re.findall(r'[a-zA-Z]{3,}', text.lower())
        # 合并去重,优先保留中文
        all_words = list(set(chinese_words)) + list(set(english_words))
        return all_words[:max_kw]

    def _classify_knowledge(self, data: Any, source: str = "") -> str:
        """
        智能自动分类:先匹配已有规则,未命中则自动创建新分支。
        返回分类路径(如 "框架设计/脉冲场架构")。
        """
        text = ""
        if isinstance(data, dict):
            text = json.dumps(data, ensure_ascii=False)
        elif isinstance(data, str):
            text = data
        else:
            text = str(data)

        # 1. 匹配已有分类规则
        best_match = None
        best_score = 0
        for category_path, keywords in self._category_keywords.items():
            score = sum(1 for kw in keywords if kw.lower() in text.lower())
            if score > best_score:
                best_score = score
                best_match = category_path

        if best_match and best_score >= 2:
            return best_match

        # 2. 根据来源兜底
        if "GitHub" in source:
            return "外部知识/GitHub"
        if "StackOverflow" in source:
            return "外部知识/StackOverflow"
        if "用户输入" in source or "用户记忆" in source:
            return "对话经验"

        # 3. 🆕 自动创建新分支
        keywords = self._extract_keywords(text)
        if len(keywords) >= 2:
            # 用前两个关键词作为一级和二级分类
            top_kw = keywords[0] if keywords else "未分类"
            sub_kw = keywords[1] if len(keywords) > 1 else "通用"
            new_path = f"{top_kw}/{sub_kw}"
            # 注册到关键词库
            if new_path not in self._category_keywords:
                self._category_keywords[new_path] = keywords[:3]
                self._ensure_branch_exists(new_path)
                print(f"🌱 自动归纳新知识领域:{new_path}(关键词:{keywords[:3]})")
            return new_path

        return "外部知识/技术文章"  # 最终兜底

    def _add_to_category_index(self, category_path: str, key: str):
        """将知识键添加到分类索引,并建立横向链接。"""
        self._ensure_branch_exists(category_path)
        if category_path not in self._category_index:
            self._category_index[category_path] = []
        if key not in self._category_index[category_path]:
            self._category_index[category_path].append(key)
        if len(self._category_index[category_path]) > 200:
            self._category_index[category_path] = self._category_index[category_path][-200:]

    def _build_cross_links(self, key: str, data: Any, category_path: str):
        """为新知识建立横向链接:扫描热记忆中内容相关的条目。"""
        text = str(data)[:300].lower()
        if not text:
            return
        keywords = self._extract_keywords(text, max_kw=3)
        if not keywords:
            return
        
        links = []
        with self._hot_lock:
            for other_key, entry in list(self._hot_cache.items())[:50]:
                if other_key == key:
                    continue
                val = entry.get("value", entry) if isinstance(entry, dict) else entry
                other_text = str(val)[:300].lower()
                # 关键词重叠度 ≥ 2 即建立链接
                overlap = sum(1 for kw in keywords if kw.lower() in other_text)
                if overlap >= 2:
                    links.append(other_key)
                    # 双向链接
                    if other_key not in self._cross_links:
                        self._cross_links[other_key] = []
                    if key not in self._cross_links[other_key]:
                        self._cross_links[other_key].append(key)
        
        if links:
            self._cross_links[key] = links
            self._stats["横向链接数"] += len(links)

    # ========== 热记忆(L1)==========

    def hot_read(self, key: str) -> Optional[Any]:
        with self._hot_lock:
            self._stats["总查询"] += 1
            if key in self._hot_cache:
                entry = self._hot_cache.pop(key)
                self._hot_cache[key] = entry
                self._hot_access_times[key] = time.time()
                self._stats["热命中"] += 1
                # 兼容:如果是新格式字典则返回值字段,否则返回原值
                if isinstance(entry, dict) and "value" in entry:
                    return entry["value"]
                return entry
            self._stats["热未命中"] += 1
            return None

    def hot_write(self, key: str, value: Any = None, category: str = "", importance: str = "B") -> None:
        """
        写入热记忆。
        importance: S=永不遗忘, A=长期保留, B=正常代谢
        category: 留空则自动分类
        """
        if value is None:
            value = key
            key = f"auto:{int(time.time_ns())}"
        
        # 自动分类
        if not category:
            category = self._classify_knowledge(value, "")
        
        with self._hot_lock:
            if key in self._hot_cache:
                del self._hot_cache[key]
            self._hot_cache[key] = {
                "value": value,
                "category": category,
                "importance": importance,
                "stored_at": time.time(),
            }
            self._hot_access_times[key] = time.time()
            while len(self._hot_cache) > self._hot_cache_max:
                oldest = self._hot_cache.popitem(last=False)
                self._hot_access_times.pop(oldest[0], None)

        # 分类索引
        self._add_to_category_index(category, key)
        # 横向链接
        self._build_cross_links(key, value, category)
        # 频率共鸣注册
        if self.frequency_codec and isinstance(value, str) and len(value) > 10:
            try:
                self.frequency_codec.add_to_resonance(value[:200], value[:200])
            except Exception:
                pass

    def hot_contains(self, key: str) -> bool:
        with self._hot_lock:
            return key in self._hot_cache

    # ========== 温记忆(L2)==========

    def warm_read(self, event_type: str) -> Optional[Any]:
        if not self.field:
            return None
        value = self.field.get_current(f"knowledge.{event_type}")
        if value is not None:
            self._stats["温命中"] += 1
        return value

    def warm_write(self, event_type: str, value: Any) -> None:
        if not self.field:
            return
        from nucleus.pulse.pulse import Pulse
        pulse = Pulse(
            timestamp_ns=time.time_ns(),
            source_organ=self.organ_name,
            event_type=f"knowledge.{event_type}",
            frequency_hz=0.1,
            priority=2,
            value=value,
            ttl_ns=int(300e9),
        )
        self.field.publish(pulse)

    def _on_knowledge_event(self, pulse) -> None:
        pass

    # ========== 冷记忆(L3)==========

    def cold_read(self, query_key: str) -> Optional[Any]:
        self._stats["冷读取"] += 1
        if not self.db:
            return None
        try:
            if hasattr(self.db, 'get_knowledge'):
                return self.db.get_knowledge(query_key)
            return None
        except Exception:
            return None

    def cold_write_async(self, key: str, value: Any, category: str = "") -> None:
        if not category:
            category = self._classify_knowledge(value, "")
        self._write_queue.append({
            "key": key, "value": value, "category": category, "timestamp": time.time()
        })
        self._add_to_category_index(category, key)
        if not self._write_running:
            self._start_async_writer()

    def _start_async_writer(self) -> None:
        self._write_running = True
        self._write_thread = threading.Thread(target=self._async_writer_loop, daemon=True)
        self._write_thread.start()

    def _async_writer_loop(self) -> None:
        while self._write_running:
            try:
                batch = []
                for _ in range(min(10, len(self._write_queue))):
                    if self._write_queue:
                        batch.append(self._write_queue.popleft())
                if batch and self.db:
                    for item in batch:
                        if hasattr(self.db, 'save_knowledge'):
                            self.db.save_knowledge(item)
                time.sleep(0.1)
            except Exception:
                time.sleep(0.5)

    def cold_flush(self) -> None:
        while self._write_queue:
            item = self._write_queue.popleft()
            if self.db and hasattr(self.db, 'save_knowledge'):
                try:
                    self.db.save_knowledge(item)
                except Exception:
                    pass

    # ========== 种子记忆(L4)==========

    def seed_set(self, key: str, value: Any) -> None:
        self._seed_memory[key] = value
        self.hot_write(f"seed:{key}", value, category="种子记忆", importance="S")

    def seed_get(self, key: str, default: Any = None) -> Any:
        return self._seed_memory.get(key, default)

    def seed_get_all(self) -> dict:
        return dict(self._seed_memory)

    # ========== 🆕 分类检索 ==========

    def search_by_category(self, query: str, limit: int = 5) -> list:
        """
        分类检索:先预判查询属于哪个分类,再在分类内搜索。
        """
        category = self._classify_knowledge(query, "")
        results = []
        
        # 在匹配的分类中搜索
        if category in self._category_index:
            keys = self._category_index[category][-50:]
            for key in reversed(keys):
                with self._hot_lock:
                    entry = self._hot_cache.get(key)
                if entry:
                    val = entry.get("value", entry) if isinstance(entry, dict) else entry
                    if self._content_matches(val, query):
                        results.append({
                            "key": key, "value": val,
                            "layer": f"分类·{category}", "score": 0.95,
                        })
                        if len(results) >= limit:
                            self._stats["分类命中"] += 1
                            return results
        
        # 横向链接扩展搜索
        if results and results[0]["key"] in self._cross_links:
            for linked_key in self._cross_links[results[0]["key"]]:
                with self._hot_lock:
                    entry = self._hot_cache.get(linked_key)
                if entry:
                    val = entry.get("value", entry) if isinstance(entry, dict) else entry
                    results.append({
                        "key": linked_key, "value": val,
                        "layer": "横向关联", "score": 0.7,
                    })
                    if len(results) >= limit:
                        return results
        
        # 回退到全局搜索
        return self.search_by_content(query, limit)

    # ========== 内容寻址检索 ==========

    def search_by_content(self, query: str, limit: int = 5) -> list:
        results = []
        if self.frequency_codec:
            resonance_results = self.frequency_codec.query_resonance(query, limit=limit)
            for r in resonance_results:
                results.append({
                    "key": f"resonance:{r['freq']:.2f}",
                    "value": r["value"],
                    "layer": "频率共鸣",
                    "score": r["score"]
                })
            if len(results) >= limit:
                return results
        query_lower = query.lower()
        
        if self.hebbian_learner and hasattr(self.hebbian_learner, 'get_top_connections'):
            try:
                top_connections = self.hebbian_learner.get_top_connections(limit=20)
                for conn in top_connections:
                    src = conn.get("from", "")
                    dst = conn.get("to", "")
                    weight = conn.get("weight", 0)
                    if weight > 0.3 and (
                        any(kw in src.lower() + dst.lower() for kw in query_lower.split())
                        or query_lower in src.lower() or query_lower in dst.lower()
                    ):
                        with self._hot_lock:
                            for key, entry in self._hot_cache.items():
                                val = entry.get("value", entry) if isinstance(entry, dict) else entry
                                if self._content_matches(val, query_lower):
                                    results.append({
                                        "key": f"hebbian:{src}{dst}",
                                        "value": val,
                                        "layer": "赫布激活",
                                        "score": 0.8 * weight
                                    })
                                    if len(results) >= limit:
                                        return results
            except Exception:
                pass

        with self._hot_lock:
            for key, entry in self._hot_cache.items():
                val = entry.get("value", entry) if isinstance(entry, dict) else entry
                if self._content_matches(val, query_lower):
                    results.append({"key": key, "value": val, "layer": "热记忆", "score": 0.9})
                    if len(results) >= limit:
                        return results

        if self.field:
            warm_keys = [k for k in self.field._current_state.keys() if k.startswith("knowledge.")]
            for key in warm_keys:
                value = self.field.get_current(key)
                if value and self._content_matches(value, query_lower):
                    results.append({"key": key, "value": value, "layer": "温记忆", "score": 0.7})
                    if len(results) >= limit:
                        return results

        if len(results) < limit:
            if self.db and hasattr(self.db, 'search_conversation'):
                try:
                    conv_results = self.db.search_conversation(query, limit=limit - len(results))
                    for item in conv_results:
                        results.append({
                            "key": item.get("id", "conv"),
                            "value": item.get("content", str(item)),
                            "layer": "冷记忆·对话",
                            "score": 0.6
                        })
                except Exception:
                    pass
            else:
                cold_result = self.cold_read(query)
                if cold_result:
                    results.append({"key": query, "value": cold_result, "layer": "冷记忆", "score": 0.5})
        
        if results:
            layers = list(set(r.get("layer", "未知") for r in results))
            print(f"📊 内容寻址命中来源: {layers},共{len(results)}条")
        return results[:limit]

    def _content_matches(self, value: Any, query_lower: str) -> bool:
        if isinstance(value, str):
            text = value
        elif isinstance(value, dict):
            if "value" in value:
                inner = value["value"]
                text = str(inner) if not isinstance(inner, str) else inner
            else:
                text = json.dumps(value, ensure_ascii=False)
        elif isinstance(value, (list, tuple)):
            text = " ".join(str(item) for item in value)
        else:
            text = str(value)

        text_lower = text.lower()
        query_lower = query_lower.strip().lower()

        if len(query_lower) <= 3:
            return query_lower in text_lower

        ngrams = set()
        for i in range(len(query_lower) - 1):
            bigram = query_lower[i:i+2]
            if bigram.strip() and not bigram.isspace():
                ngrams.add(bigram)
        if not ngrams:
            return query_lower in text_lower

        hit_count = sum(1 for ng in ngrams if ng in text_lower)
        return (hit_count / len(ngrams)) >= 0.6

    # ========== 记忆代谢 ==========

    def _start_metabolism(self) -> None:
        self._metabolism_running = True
        self._metabolism_thread = threading.Thread(target=self._metabolism_loop, daemon=True)
        self._metabolism_thread.start()

    def _metabolism_loop(self) -> None:
        while self._metabolism_running:
            time.sleep(self._metabolism_interval)
            try:
                self._metabolize()
            except Exception:
                pass

    def _metabolize(self) -> None:
        now = time.time()
        migrated_count = 0

        with self._hot_lock:
            keys_to_migrate = []
            for key, last_access in list(self._hot_access_times.items()):
                if now - last_access > self._hot_ttl_seconds:
                    entry = self._hot_cache.get(key)
                    if isinstance(entry, dict) and entry.get("importance") in ("S", "A"):
                        continue
                    keys_to_migrate.append(key)
            
            for key in keys_to_migrate:
                entry = self._hot_cache.get(key)
                if entry is not None:
                    val = entry.get("value", entry) if isinstance(entry, dict) else entry
                    topic_key = key.replace(":", "_").replace(" ", "_")[:80]
                    self.warm_write(f"migrated.{topic_key}", str(val)[:300])
                    del self._hot_cache[key]
                    self._hot_access_times.pop(key, None)
                    migrated_count += 1

        if migrated_count > 0:
            self._stats["代谢迁移次数"] += migrated_count
            print(f"🧠 记忆代谢:{migrated_count}条热记忆迁移至温记忆")

    # ========== 预测预激活 ==========

    def record_event_sequence(self, event_type: str) -> None:
        self._event_sequence.append(event_type)
        if len(self._event_sequence) >= 2:
            prev = self._event_sequence[-2]
            curr = self._event_sequence[-1]
            if prev not in self._transition_probs:
                self._transition_probs[prev] = {}
            self._transition_probs[prev][curr] = self._transition_probs[prev].get(curr, 0) + 1

    def predict_next_events(self, current_event: str, top_k: int = 3) -> list:
        if current_event not in self._transition_probs:
            return []
        transitions = self._transition_probs[current_event]
        total = sum(transitions.values())
        if total == 0:
            return []
        sorted_trans = sorted(transitions.items(), key=lambda x: x[1], reverse=True)
        return [(e, c / total) for e, c in sorted_trans[:top_k]]

    def predictive_preload(self, current_event: str) -> int:
        predictions = self.predict_next_events(current_event)
        preloaded = 0
        for event_type, prob in predictions:
            if prob < 0.3:
                continue
            value = self.warm_read(event_type)
            if value is not None:
                self.hot_write(f"predictive:{event_type}", value)
                preloaded += 1
        if preloaded > 0:
            self._stats["预测命中"] += 1
        return preloaded

    # ========== 统一查询 ==========

    def query(self, key: str, context_event: str = "") -> Optional[Any]:
        value = self.hot_read(key)
        if value is not None:
            return value
        value = self.warm_read(key)
        if value is not None:
            self.hot_write(key, value)
            return value
        value = self.cold_read(key)
        if value is not None:
            self.hot_write(key, value)
            self.warm_write(key, value)
            return value
        if context_event:
            self.record_event_sequence(context_event)
            self.predictive_preload(context_event)
        return None

    # ========== 脉冲同步 ==========

    def sync_from_pulse(self, pulse) -> None:
        content = None
        if pulse.value:
            if isinstance(pulse.value, dict):
                for field in ("content", "text", "summary", "title", "data"):
                    if field in pulse.value and pulse.value[field]:
                        content = pulse.value[field]
                        break
                if content is None:
                    content = str(pulse.value)[:200]
            elif isinstance(pulse.value, str):
                if len(pulse.value) > 3: 
                    content = pulse.value[:200]
            else:
                content = str(pulse.value)[:200]

        if content is None:
            return

        key = f"pulse:{pulse.event_type}:{int(time.time())}"
        self.hot_write(key, content)

        if len(pulse.event_type) < 50:
            self.warm_write(pulse.event_type, content)

    def _on_auto_activate_pulse(self, pulse):
        event_name = pulse.event_type
        if self.frequency_codec:
            related = self.frequency_codec.query_resonance(event_name, limit=5)
            for item in related:
                self.hot_write(f"auto:{event_name}:{item['freq']:.2f}", item['value'])
        warm_val = self.warm_read(event_name)
        if warm_val is not None:
            self.hot_write(f"auto:{event_name}:warm", warm_val)

    # ========== 统计 ==========

    def get_stats(self) -> dict:
        with self._hot_lock:
            s = dict(self._stats)
            s["热缓存大小"] = len(self._hot_cache)
            s["写入队列大小"] = len(self._write_queue)
            s["转移规则数"] = len(self._transition_probs)
            s["分类索引数"] = len(self._category_index)
            s["树节点数"] = self._count_tree_nodes(self._knowledge_tree)
            total = max(1, s["总查询"])
            s["热命中率"] = round(s["热命中"] / total, 3)
            s["温命中率"] = round(s["温命中"] / total, 3)
            return s

    # ========== 知识树查询 ==========

    def get_knowledge_tree(self) -> dict:
        """获取当前知识树的完整结构。"""
        return {
            "tree": self._knowledge_tree,
            "index_count": len(self._category_index),
            "cross_links_count": len(self._cross_links),
            "keywords_count": len(self._category_keywords),
        }

    # ========== 生命周期 ==========

    def stop(self) -> None:
        self._metabolism_running = False
        # 保存热记忆
        try:
            with self._hot_lock:
                data = dict(list(self._hot_cache.items())[-50:])
            with open(self._hot_cache_path, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False)
            print(f"💾 热记忆缓存已保存到磁盘({len(data)}条)")
        except Exception:
            pass
        # 保存知识树
        self._save_knowledge_tree()
        print(f"🌳 知识树已保存:{self._count_tree_nodes(self._knowledge_tree)}个节点,{len(self._category_index)}个分类")
        self._write_running = False
        self.cold_flush()
        if self._write_thread:
            self._write_thread.join(timeout=5)
        if self._metabolism_thread:
            self._metabolism_thread.join(timeout=5)


# 自测
if __name__ == "__main__":
    print("=== ContentMemory 自测 ===\n")

    from nucleus.chronos.event_stream import EventStream
    from nucleus.field.info_field import InfoField

    stream = EventStream(max_length=100)
    field = InfoField(event_stream=stream)
    mem = ContentMemory("测试器官", info_field=field)

    print("测试1:热记忆读写(带分类)")
    mem.hot_write("test_key", "事件脉冲驱动的核心是脉冲发生器", category="框架设计/脉冲场架构")
    val = mem.hot_read("test_key")
    print(f"  写入后读取: {val}")
    assert val is not None

    print("测试2:自动分类")
    category = mem._classify_knowledge("GPU显存不足触发熔断", "")
    print(f"  自动分类: {category}")
    assert "GPU" in category

    print("测试3:自动创建新分支")
    new_category = mem._classify_knowledge("生物基因编辑技术CRISPR的应用", "")
    print(f"  新分支: {new_category}")
    tree_info = mem.get_knowledge_tree()
    print(f"  树节点数: {tree_info['index_count']}")

    print("测试4:种子记忆(S级)")
    mem.seed_set("核心身份", "测试新人类")
    val = mem.seed_get("核心身份")
    print(f"  种子记忆: {val}")
    assert val == "测试新人类"

    print("测试5:横向链接")
    mem.hot_write("link_test_1", "脉冲场架构是v8.0的核心设计")
    mem.hot_write("link_test_2", "脉冲发生器负责发射事件脉冲")
    links = mem._cross_links
    print(f"  横向链接数: {len(links)}")

    stats = mem.get_stats()
    print(f"\n统计: 热命中率={stats['热命中率']}, 分类索引数={stats['分类索引数']}, 树节点数={stats['树节点数']}")
    mem.stop()
    print("\n=== 全部自测通过 ===")
三、开源发布配套说明(可单独新建OPEN_SOURCE_NOTICE.md)
Markdown
# 开源法定申明文件
## 1、专利禁令说明
1. ContentMemory整套四层仿生认知记忆源代码、算法原理、架构设计**永久全民开源,禁止任何主体申报各类专利**2. 禁止专利范围包含:
   - 四层分级记忆架构(热///种子四层存储分层)
   - S/A/B三级记忆重要度分级与代谢淘汰算法
   - 自生长知识树自动分类、动态建分支逻辑
   - 内容关键词匹配+双向横向关联联想算法
   - 事件序列统计+预测预加载缓存方案
   - 脉冲场联动记忆存储、频率共鸣检索机制
3. 任何单位或个人私自申请上述相关专利,所有专利自始无效,全球所有使用者均可无偿发起专利无效申诉。

## 2、使用权限
1. 免费商用、个人使用、企业闭源集成、二次修改、项目fork、衍生开发无任何限制。
2. 衍生项目可以闭源,但不得将原生架构申请专利。

## 3、协议
基础协议:MIT License + 附加反专利开源条款(附加条款优先级高于MIT)
开发日期:2026.5.28~2026.6.6
项目归属:曈曈v8.0脉冲场类脑开源项目
部署目录结构
Plaintext
项目根目录
├─README.md
├─OPEN_SOURCE_NOTICE.md
├─nucleus/
│  └─mnemosyne/
│     └─content_memory.py
└─data/ # 运行自动生成,无需手动创建
需要我帮你打包成压缩包目录文案,或者生成github仓库发布简介吗?
|(注:文档部分内容可能由 AI 生成)
Logo

脑启社区是一个专注类脑智能领域的开发者社区。欢迎加入社区,共建类脑智能生态。社区为开发者提供了丰富的开源类脑工具软件、类脑算法模型及数据集、类脑知识库、类脑技术培训课程以及类脑应用案例等资源。

更多推荐