ContentMemory v8.0 四层仿生认知记忆引擎开源文档
ContentMemory v8.0 四层仿生认知记忆引擎开源文档整合成品:README.md + 完整源码一体化开源文档开源声明前置:本项目代码全量永久全民公开,任何人、机构不得针对本套四层认知记忆系统、自生长知识树、脉冲联动记忆架构申请发明专利、实用新型专利、软著独占专利;代码可无偿商用、修改、分发、二次衍生。项目:ContentMemory v8.0|曈曈v8.0脉冲场类脑架构|开源协议:M
ContentMemory v8.0 四层仿生认知记忆引擎开源文档
整合成品:README.md + 完整源码一体化开源文档
开源声明前置:本项目代码全量永久全民公开,任何人、机构不得针对本套四层认知记忆系统、自生长知识树、脉冲联动记忆架构申请发明专利、实用新型专利、软著独占专利;代码可无偿商用、修改、分发、二次衍生。
项目:ContentMemory v8.0|曈曈v8.0脉冲场类脑架构|开源协议:MIT+开源防专利附加条款|日期:2026-06-06
一、README.md(直接保存为项目根目录README.md)
Markdown
ContentMemory v8.0 四层仿生认知记忆引擎
基于脉冲场事件驱动架构 | Python3.8+ | 永久全民开源·禁止专利申报
重要开源法律附加约定(优先级高于MIT协议)
- 本项目全部源代码、架构思路、算法逻辑全民永久无偿公开,全球任意个人、企业可免费使用、商用、二次开发、闭源集成、分发复刻。
- 任何组织、个人不得以本项目:四层分级记忆架构、自生长知识树算法、自动分类逻辑、突触横向关联机制、记忆代谢淘汰算法、事件预测预加载、脉冲联动记忆方案单独或组合申请发明专利、实用新型专利、软件著作专利、技术专利;一经发现专利申请,所有相关专利天然无效,全体开源使用者可无偿举证作废。
- 允许衍生项目开源/闭源,但衍生项目不得将原生架构申请专利。
项目简介
ContentMemory 仿生四层人脑认知记忆引擎,模拟人脑工作记忆/短期记忆/长期记忆/本能种子记忆,配套自生长知识树、自动分类、横向联想关联、频率共鸣检索、时序事件预测预加载,原生适配脉冲场类脑智能框架,可用于AI智能体长期记忆、轻量化RAG知识库、机器人大脑、大模型对话记忆管理。
核心特性
- ✅ L1热/L2温/L3冷/L4种子 四层分级记忆,S/A/B三级记忆留存权重
- ✅ 自生长多层级知识树:自动归类内容、自动创建全新分类分支、动态扩充关键词库
- ✅ 突触式横向双向知识关联,检索自动联动关联知识库
- ✅ 四合一检索:分类定向检索+全文检索+频率共振检索+赫布突触关联检索
- ✅ 后台异步落盘+定时记忆代谢+事件预测主动预缓存
- ✅ 全量数据持久化:知识树、热点缓存JSON落地,重启自动加载恢复
- ✅ 线程安全,读写分离,后台业务线程不阻塞主进程
目录说明
nucleus/
└─mnemosyne/
└─content_memory.py # 主源码文件
data/ # 自动生成持久化目录
├─knowledge_tree.json
└─hot_cache.json
Plaintext
快速使用
from nucleus.mnemosyne.content_memory import ContentMemory
# 最简初始化
mem = ContentMemory(organ_name="demo_brain", hot_cache_size=200)
# 自动分类写入记忆
mem.hot_write("gpu_1050ti", "GTX1050Ti 4G DDR5独立显卡")
# 手动指定分类写入
mem.hot_write("crispr_01", "CRISPR基因编辑技术原理", category="生物工程/基因技术")
# S级永久种子记忆(永不遗忘)
mem.seed_set("agent_id", "曈曈v8类脑智能体")
# 分类检索
res = mem.search_by_category("显卡", limit=3)
print(res)
# 优雅退出自动保存知识树、热点缓存
mem.stop()
API参考
详见源码内注释,内置main自测代码,直接运行py文件自动全量自测。
迭代规划
v8.1:LLM语义自动分类优化
v8.2:Flask知识树可视化面板
v8.3:大库分片存储优化
Plaintext
## 二、content_memory.py(在你原有代码最顶部追加开源防专利声明,完整源码如下,直接全量替换保存)
```python
"""
nucleus/mnemosyne/content_memory.py — v8.0 脉冲场架构 · 认知记忆层核心
==========================================================================
【全民永久开源附加条款(强制生效,不可删除)】
1. 本代码、架构方案、自生长知识树算法、四层记忆分层设计、横向关联逻辑、记忆代谢策略全部内容全民无偿公开,永久开放。
2. 任何自然人、法人、企事业单位、科研机构严禁针对本文件包含的全部/局部技术方案申请任何类型专利(发明专利、实用新型、外观专利、软件知识产权专利),所有违规申报专利自申请之日起视作无效。
3. 使用者拥有无限制使用、商用、修改、分发、二次开发、闭源集成权利,无需原作者授权,无版权使用费。
开源主协议:MIT License
版本:v8.0
创建日期:2026年5月28日
升级日期:2026年6月6日
四层认知记忆系统 + 自生长多层级知识树 + 横向关联 + 频率共鸣加速
v8.0 知识系统升级(2026-06-06):
- 自生长知识树:新知识自动归纳,未知领域自动创建分支
- 横向关联:存入时扫描已有知识,自动建立跨分支链接
- 分类检索:先定位分支再精确匹配,大幅提升检索精度
- 记忆分级:S=永不遗忘, A=长期保留, B=正常代谢
- 树状结构持久化:重启后完整恢复分类体系
核心类:
ContentMemory — 四层认知记忆系统 + 自生长知识树
==========================================================================
"""
import sys
import os
import time
import json
import threading
import re
from collections import OrderedDict, deque
from typing import Any, Optional, Callable
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class ContentMemory:
"""四层认知记忆系统 + 自生长多层级知识树。"""
def __init__(self, organ_name: str, info_field=None, db_store=None,
hot_cache_size: int = 100, frequency_codec=None, hebbian_learner=None):
self.organ_name = organ_name
self.field = info_field
self.db = db_store
self.frequency_codec = frequency_codec
self.hebbian_learner = hebbian_learner
# 热记忆
self._hot_cache: OrderedDict = OrderedDict()
self._hot_cache_max = hot_cache_size
self._hot_access_times: dict[str, float] = {}
self._hot_lock = threading.Lock()
# 冷记忆
self._write_queue: deque = deque(maxlen=1000)
self._write_thread: Optional[threading.Thread] = None
self._write_running = False
# 种子记忆
self._seed_memory: dict = {}
# 预测缓存
self._predictive_queue: deque = deque(maxlen=50)
self._event_sequence: deque = deque(maxlen=200)
self._transition_probs: dict = {}
# ========== 🆕 自生长多层级知识分类树 ==========
self._knowledge_tree: dict = {} # 树状结构
self._category_index: dict[str, list] = {} # 分类路径 → 知识键列表
self._cross_links: dict[str, list] = {} # 横向关联:知识键 → 关联知识键列表
self._tree_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"data", "knowledge_tree.json"
)
self._init_knowledge_tree()
# ========== 🆕 自动分类关键词库(动态扩展) ==========
self._category_keywords: dict[str, list] = {
"框架设计/脉冲场架构": ["事件脉冲", "脉冲场", "频率编码", "脉冲发生器", "不应期", "噪声过滤"],
"框架设计/信息场": ["信息场", "场域阻尼", "状态平滑", "订阅熔断", "梯度感知"],
"框架设计/存算一体": ["存算一体", "器官持久化", "本地记忆", "去中心化"],
"框架设计/硬件容错": ["硬件容错", "梯度降级", "熔断", "自愈", "故障预测"],
"小林/个人信息": ["小林", "任桂林", "预算员", "一建", "备考"],
"小林/偏好": ["咖啡", "喜欢", "偏好"],
"小林/家人": ["路灯", "哥哥", "小曈曈", "女儿"],
"硬件知识/GPU": ["GPU", "显存", "1050", "nvidia", "显卡"],
"硬件知识/CPU": ["CPU", "Ryzen", "Intel", "处理器", "核心"],
"硬件知识/内存": ["内存", "RAM", "DDR", "48GB"],
"航天工程/推进系统": ["火箭", "推进", "比冲", "发动机"],
"航天工程/轨道力学": ["轨道", "轨道力学", "航天器轨道"],
"航天工程/航天器设计": ["航天器", "飞船", "卫星"],
"计算机科学/编程语言": ["Python", "代码", "编程", "函数", "算法", "排序"],
"计算机科学/数据库": ["数据库", "SQL", "DuckDB", "查询", "索引"],
"计算机科学/AI": ["机器学习", "神经网络", "深度学习", "模型训练"],
"对话经验/问候": ["问候", "早安", "晚安", "你好", "嗨"],
"对话经验/情绪回应": ["情绪", "开心", "难过", "焦虑", "生气", "累"],
"外部知识/GitHub": ["GitHub", "仓库", "开源", "Star"],
"外部知识/StackOverflow": ["StackOverflow", "问答", "编程问题"],
}
# 统计
self._stats = {
"热命中": 0, "热未命中": 0, "温命中": 0, "冷读取": 0,
"预测命中": 0, "总查询": 0, "代谢迁移次数": 0,
"分类命中": 0, "新增分支": 0, "横向链接数": 0,
}
self._hot_ttl_seconds = 600
self._warm_ttl_seconds = 3600
self._metabolism_interval = 300
self._metabolism_running = False
self._metabolism_thread: Optional[threading.Thread] = None
self._start_metabolism()
# 加载热记忆缓存
self._hot_cache_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"data", "hot_cache.json"
)
try:
if os.path.exists(self._hot_cache_path):
with open(self._hot_cache_path, 'r', encoding='utf-8') as f:
cached = json.load(f)
if isinstance(cached, dict):
now = time.time()
with self._hot_lock:
for k, v in cached.items():
if len(self._hot_cache) < self._hot_cache_max:
self._hot_cache[k] = v
self._hot_access_times[k] = now
print(f"📦 从磁盘加载了 {len(cached)} 条热记忆缓存")
except Exception:
pass
if self.field:
self.field.subscribe_pattern(r"knowledge\..*", self._on_knowledge_event)
self._auto_activate_events = [
"memory_tight", "gpu_overload", "gpu_mem_critical",
"low_memory_critical", "touch.alert", "resonance.formed"
]
for ev in self._auto_activate_events:
if self.field:
self.field.subscribe(ev, self._on_auto_activate_pulse)
# ========== 🆕 知识树初始化与持久化 ==========
def _init_knowledge_tree(self):
"""初始化知识树,优先从磁盘加载,否则创建空树。"""
try:
if os.path.exists(self._tree_path):
with open(self._tree_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self._knowledge_tree = data.get("tree", {})
self._category_index = data.get("index", {})
self._category_keywords = data.get("keywords", self._category_keywords)
self._cross_links = data.get("cross_links", {})
tree_size = self._count_tree_nodes(self._knowledge_tree)
print(f"🌳 从磁盘恢复了知识树:{tree_size}个节点,{len(self._category_index)}个分类索引")
return
except Exception:
pass
# 磁盘无数据,创建空树
self._knowledge_tree = {}
def _count_tree_nodes(self, tree: dict) -> int:
"""递归统计树节点数。"""
count = 0
for v in tree.values():
count += 1
if isinstance(v, dict):
count += self._count_tree_nodes(v)
return count
def _save_knowledge_tree(self):
"""保存知识树到磁盘。"""
try:
os.makedirs(os.path.dirname(self._tree_path), exist_ok=True)
with open(self._tree_path, 'w', encoding='utf-8') as f:
json.dump({
"tree": self._knowledge_tree,
"index": self._category_index,
"keywords": self._category_keywords,
"cross_links": self._cross_links,
"timestamp": time.time(),
}, f, ensure_ascii=False, indent=2)
except Exception:
pass
def _ensure_branch_exists(self, category_path: str):
"""确保树中存在指定路径的所有分支节点,不存在则自动创建。"""
parts = [p.strip() for p in category_path.split("/") if p.strip()]
if not parts:
return
current = self._knowledge_tree
for part in parts:
if part not in current:
current[part] = {}
self._stats["新增分支"] += 1
print(f"🌱 知识树自动生长:新分支「{part}」已创建(父路径:{'/'.join(parts[:parts.index(part)])})")
current = current[part]
# ========== 🆕 智能自动分类 ==========
def _extract_keywords(self, text: str, max_kw: int = 5) -> list:
"""从文本中提取关键词(中文2-4字词 + 英文单词)。"""
text = str(text)[:500]
# 中文关键词
chinese_words = re.findall(r'[\u4e00-\u9fff]{2,4}', text)
# 英文关键词
english_words = re.findall(r'[a-zA-Z]{3,}', text.lower())
# 合并去重,优先保留中文
all_words = list(set(chinese_words)) + list(set(english_words))
return all_words[:max_kw]
def _classify_knowledge(self, data: Any, source: str = "") -> str:
"""
智能自动分类:先匹配已有规则,未命中则自动创建新分支。
返回分类路径(如 "框架设计/脉冲场架构")。
"""
text = ""
if isinstance(data, dict):
text = json.dumps(data, ensure_ascii=False)
elif isinstance(data, str):
text = data
else:
text = str(data)
# 1. 匹配已有分类规则
best_match = None
best_score = 0
for category_path, keywords in self._category_keywords.items():
score = sum(1 for kw in keywords if kw.lower() in text.lower())
if score > best_score:
best_score = score
best_match = category_path
if best_match and best_score >= 2:
return best_match
# 2. 根据来源兜底
if "GitHub" in source:
return "外部知识/GitHub"
if "StackOverflow" in source:
return "外部知识/StackOverflow"
if "用户输入" in source or "用户记忆" in source:
return "对话经验"
# 3. 🆕 自动创建新分支
keywords = self._extract_keywords(text)
if len(keywords) >= 2:
# 用前两个关键词作为一级和二级分类
top_kw = keywords[0] if keywords else "未分类"
sub_kw = keywords[1] if len(keywords) > 1 else "通用"
new_path = f"{top_kw}/{sub_kw}"
# 注册到关键词库
if new_path not in self._category_keywords:
self._category_keywords[new_path] = keywords[:3]
self._ensure_branch_exists(new_path)
print(f"🌱 自动归纳新知识领域:{new_path}(关键词:{keywords[:3]})")
return new_path
return "外部知识/技术文章" # 最终兜底
def _add_to_category_index(self, category_path: str, key: str):
"""将知识键添加到分类索引,并建立横向链接。"""
self._ensure_branch_exists(category_path)
if category_path not in self._category_index:
self._category_index[category_path] = []
if key not in self._category_index[category_path]:
self._category_index[category_path].append(key)
if len(self._category_index[category_path]) > 200:
self._category_index[category_path] = self._category_index[category_path][-200:]
def _build_cross_links(self, key: str, data: Any, category_path: str):
"""为新知识建立横向链接:扫描热记忆中内容相关的条目。"""
text = str(data)[:300].lower()
if not text:
return
keywords = self._extract_keywords(text, max_kw=3)
if not keywords:
return
links = []
with self._hot_lock:
for other_key, entry in list(self._hot_cache.items())[:50]:
if other_key == key:
continue
val = entry.get("value", entry) if isinstance(entry, dict) else entry
other_text = str(val)[:300].lower()
# 关键词重叠度 ≥ 2 即建立链接
overlap = sum(1 for kw in keywords if kw.lower() in other_text)
if overlap >= 2:
links.append(other_key)
# 双向链接
if other_key not in self._cross_links:
self._cross_links[other_key] = []
if key not in self._cross_links[other_key]:
self._cross_links[other_key].append(key)
if links:
self._cross_links[key] = links
self._stats["横向链接数"] += len(links)
# ========== 热记忆(L1)==========
def hot_read(self, key: str) -> Optional[Any]:
with self._hot_lock:
self._stats["总查询"] += 1
if key in self._hot_cache:
entry = self._hot_cache.pop(key)
self._hot_cache[key] = entry
self._hot_access_times[key] = time.time()
self._stats["热命中"] += 1
# 兼容:如果是新格式字典则返回值字段,否则返回原值
if isinstance(entry, dict) and "value" in entry:
return entry["value"]
return entry
self._stats["热未命中"] += 1
return None
def hot_write(self, key: str, value: Any = None, category: str = "", importance: str = "B") -> None:
"""
写入热记忆。
importance: S=永不遗忘, A=长期保留, B=正常代谢
category: 留空则自动分类
"""
if value is None:
value = key
key = f"auto:{int(time.time_ns())}"
# 自动分类
if not category:
category = self._classify_knowledge(value, "")
with self._hot_lock:
if key in self._hot_cache:
del self._hot_cache[key]
self._hot_cache[key] = {
"value": value,
"category": category,
"importance": importance,
"stored_at": time.time(),
}
self._hot_access_times[key] = time.time()
while len(self._hot_cache) > self._hot_cache_max:
oldest = self._hot_cache.popitem(last=False)
self._hot_access_times.pop(oldest[0], None)
# 分类索引
self._add_to_category_index(category, key)
# 横向链接
self._build_cross_links(key, value, category)
# 频率共鸣注册
if self.frequency_codec and isinstance(value, str) and len(value) > 10:
try:
self.frequency_codec.add_to_resonance(value[:200], value[:200])
except Exception:
pass
def hot_contains(self, key: str) -> bool:
with self._hot_lock:
return key in self._hot_cache
# ========== 温记忆(L2)==========
def warm_read(self, event_type: str) -> Optional[Any]:
if not self.field:
return None
value = self.field.get_current(f"knowledge.{event_type}")
if value is not None:
self._stats["温命中"] += 1
return value
def warm_write(self, event_type: str, value: Any) -> None:
if not self.field:
return
from nucleus.pulse.pulse import Pulse
pulse = Pulse(
timestamp_ns=time.time_ns(),
source_organ=self.organ_name,
event_type=f"knowledge.{event_type}",
frequency_hz=0.1,
priority=2,
value=value,
ttl_ns=int(300e9),
)
self.field.publish(pulse)
def _on_knowledge_event(self, pulse) -> None:
pass
# ========== 冷记忆(L3)==========
def cold_read(self, query_key: str) -> Optional[Any]:
self._stats["冷读取"] += 1
if not self.db:
return None
try:
if hasattr(self.db, 'get_knowledge'):
return self.db.get_knowledge(query_key)
return None
except Exception:
return None
def cold_write_async(self, key: str, value: Any, category: str = "") -> None:
if not category:
category = self._classify_knowledge(value, "")
self._write_queue.append({
"key": key, "value": value, "category": category, "timestamp": time.time()
})
self._add_to_category_index(category, key)
if not self._write_running:
self._start_async_writer()
def _start_async_writer(self) -> None:
self._write_running = True
self._write_thread = threading.Thread(target=self._async_writer_loop, daemon=True)
self._write_thread.start()
def _async_writer_loop(self) -> None:
while self._write_running:
try:
batch = []
for _ in range(min(10, len(self._write_queue))):
if self._write_queue:
batch.append(self._write_queue.popleft())
if batch and self.db:
for item in batch:
if hasattr(self.db, 'save_knowledge'):
self.db.save_knowledge(item)
time.sleep(0.1)
except Exception:
time.sleep(0.5)
def cold_flush(self) -> None:
while self._write_queue:
item = self._write_queue.popleft()
if self.db and hasattr(self.db, 'save_knowledge'):
try:
self.db.save_knowledge(item)
except Exception:
pass
# ========== 种子记忆(L4)==========
def seed_set(self, key: str, value: Any) -> None:
self._seed_memory[key] = value
self.hot_write(f"seed:{key}", value, category="种子记忆", importance="S")
def seed_get(self, key: str, default: Any = None) -> Any:
return self._seed_memory.get(key, default)
def seed_get_all(self) -> dict:
return dict(self._seed_memory)
# ========== 🆕 分类检索 ==========
def search_by_category(self, query: str, limit: int = 5) -> list:
"""
分类检索:先预判查询属于哪个分类,再在分类内搜索。
"""
category = self._classify_knowledge(query, "")
results = []
# 在匹配的分类中搜索
if category in self._category_index:
keys = self._category_index[category][-50:]
for key in reversed(keys):
with self._hot_lock:
entry = self._hot_cache.get(key)
if entry:
val = entry.get("value", entry) if isinstance(entry, dict) else entry
if self._content_matches(val, query):
results.append({
"key": key, "value": val,
"layer": f"分类·{category}", "score": 0.95,
})
if len(results) >= limit:
self._stats["分类命中"] += 1
return results
# 横向链接扩展搜索
if results and results[0]["key"] in self._cross_links:
for linked_key in self._cross_links[results[0]["key"]]:
with self._hot_lock:
entry = self._hot_cache.get(linked_key)
if entry:
val = entry.get("value", entry) if isinstance(entry, dict) else entry
results.append({
"key": linked_key, "value": val,
"layer": "横向关联", "score": 0.7,
})
if len(results) >= limit:
return results
# 回退到全局搜索
return self.search_by_content(query, limit)
# ========== 内容寻址检索 ==========
def search_by_content(self, query: str, limit: int = 5) -> list:
results = []
if self.frequency_codec:
resonance_results = self.frequency_codec.query_resonance(query, limit=limit)
for r in resonance_results:
results.append({
"key": f"resonance:{r['freq']:.2f}",
"value": r["value"],
"layer": "频率共鸣",
"score": r["score"]
})
if len(results) >= limit:
return results
query_lower = query.lower()
if self.hebbian_learner and hasattr(self.hebbian_learner, 'get_top_connections'):
try:
top_connections = self.hebbian_learner.get_top_connections(limit=20)
for conn in top_connections:
src = conn.get("from", "")
dst = conn.get("to", "")
weight = conn.get("weight", 0)
if weight > 0.3 and (
any(kw in src.lower() + dst.lower() for kw in query_lower.split())
or query_lower in src.lower() or query_lower in dst.lower()
):
with self._hot_lock:
for key, entry in self._hot_cache.items():
val = entry.get("value", entry) if isinstance(entry, dict) else entry
if self._content_matches(val, query_lower):
results.append({
"key": f"hebbian:{src}→{dst}",
"value": val,
"layer": "赫布激活",
"score": 0.8 * weight
})
if len(results) >= limit:
return results
except Exception:
pass
with self._hot_lock:
for key, entry in self._hot_cache.items():
val = entry.get("value", entry) if isinstance(entry, dict) else entry
if self._content_matches(val, query_lower):
results.append({"key": key, "value": val, "layer": "热记忆", "score": 0.9})
if len(results) >= limit:
return results
if self.field:
warm_keys = [k for k in self.field._current_state.keys() if k.startswith("knowledge.")]
for key in warm_keys:
value = self.field.get_current(key)
if value and self._content_matches(value, query_lower):
results.append({"key": key, "value": value, "layer": "温记忆", "score": 0.7})
if len(results) >= limit:
return results
if len(results) < limit:
if self.db and hasattr(self.db, 'search_conversation'):
try:
conv_results = self.db.search_conversation(query, limit=limit - len(results))
for item in conv_results:
results.append({
"key": item.get("id", "conv"),
"value": item.get("content", str(item)),
"layer": "冷记忆·对话",
"score": 0.6
})
except Exception:
pass
else:
cold_result = self.cold_read(query)
if cold_result:
results.append({"key": query, "value": cold_result, "layer": "冷记忆", "score": 0.5})
if results:
layers = list(set(r.get("layer", "未知") for r in results))
print(f"📊 内容寻址命中来源: {layers},共{len(results)}条")
return results[:limit]
def _content_matches(self, value: Any, query_lower: str) -> bool:
if isinstance(value, str):
text = value
elif isinstance(value, dict):
if "value" in value:
inner = value["value"]
text = str(inner) if not isinstance(inner, str) else inner
else:
text = json.dumps(value, ensure_ascii=False)
elif isinstance(value, (list, tuple)):
text = " ".join(str(item) for item in value)
else:
text = str(value)
text_lower = text.lower()
query_lower = query_lower.strip().lower()
if len(query_lower) <= 3:
return query_lower in text_lower
ngrams = set()
for i in range(len(query_lower) - 1):
bigram = query_lower[i:i+2]
if bigram.strip() and not bigram.isspace():
ngrams.add(bigram)
if not ngrams:
return query_lower in text_lower
hit_count = sum(1 for ng in ngrams if ng in text_lower)
return (hit_count / len(ngrams)) >= 0.6
# ========== 记忆代谢 ==========
def _start_metabolism(self) -> None:
self._metabolism_running = True
self._metabolism_thread = threading.Thread(target=self._metabolism_loop, daemon=True)
self._metabolism_thread.start()
def _metabolism_loop(self) -> None:
while self._metabolism_running:
time.sleep(self._metabolism_interval)
try:
self._metabolize()
except Exception:
pass
def _metabolize(self) -> None:
now = time.time()
migrated_count = 0
with self._hot_lock:
keys_to_migrate = []
for key, last_access in list(self._hot_access_times.items()):
if now - last_access > self._hot_ttl_seconds:
entry = self._hot_cache.get(key)
if isinstance(entry, dict) and entry.get("importance") in ("S", "A"):
continue
keys_to_migrate.append(key)
for key in keys_to_migrate:
entry = self._hot_cache.get(key)
if entry is not None:
val = entry.get("value", entry) if isinstance(entry, dict) else entry
topic_key = key.replace(":", "_").replace(" ", "_")[:80]
self.warm_write(f"migrated.{topic_key}", str(val)[:300])
del self._hot_cache[key]
self._hot_access_times.pop(key, None)
migrated_count += 1
if migrated_count > 0:
self._stats["代谢迁移次数"] += migrated_count
print(f"🧠 记忆代谢:{migrated_count}条热记忆迁移至温记忆")
# ========== 预测预激活 ==========
def record_event_sequence(self, event_type: str) -> None:
self._event_sequence.append(event_type)
if len(self._event_sequence) >= 2:
prev = self._event_sequence[-2]
curr = self._event_sequence[-1]
if prev not in self._transition_probs:
self._transition_probs[prev] = {}
self._transition_probs[prev][curr] = self._transition_probs[prev].get(curr, 0) + 1
def predict_next_events(self, current_event: str, top_k: int = 3) -> list:
if current_event not in self._transition_probs:
return []
transitions = self._transition_probs[current_event]
total = sum(transitions.values())
if total == 0:
return []
sorted_trans = sorted(transitions.items(), key=lambda x: x[1], reverse=True)
return [(e, c / total) for e, c in sorted_trans[:top_k]]
def predictive_preload(self, current_event: str) -> int:
predictions = self.predict_next_events(current_event)
preloaded = 0
for event_type, prob in predictions:
if prob < 0.3:
continue
value = self.warm_read(event_type)
if value is not None:
self.hot_write(f"predictive:{event_type}", value)
preloaded += 1
if preloaded > 0:
self._stats["预测命中"] += 1
return preloaded
# ========== 统一查询 ==========
def query(self, key: str, context_event: str = "") -> Optional[Any]:
value = self.hot_read(key)
if value is not None:
return value
value = self.warm_read(key)
if value is not None:
self.hot_write(key, value)
return value
value = self.cold_read(key)
if value is not None:
self.hot_write(key, value)
self.warm_write(key, value)
return value
if context_event:
self.record_event_sequence(context_event)
self.predictive_preload(context_event)
return None
# ========== 脉冲同步 ==========
def sync_from_pulse(self, pulse) -> None:
content = None
if pulse.value:
if isinstance(pulse.value, dict):
for field in ("content", "text", "summary", "title", "data"):
if field in pulse.value and pulse.value[field]:
content = pulse.value[field]
break
if content is None:
content = str(pulse.value)[:200]
elif isinstance(pulse.value, str):
if len(pulse.value) > 3:
content = pulse.value[:200]
else:
content = str(pulse.value)[:200]
if content is None:
return
key = f"pulse:{pulse.event_type}:{int(time.time())}"
self.hot_write(key, content)
if len(pulse.event_type) < 50:
self.warm_write(pulse.event_type, content)
def _on_auto_activate_pulse(self, pulse):
event_name = pulse.event_type
if self.frequency_codec:
related = self.frequency_codec.query_resonance(event_name, limit=5)
for item in related:
self.hot_write(f"auto:{event_name}:{item['freq']:.2f}", item['value'])
warm_val = self.warm_read(event_name)
if warm_val is not None:
self.hot_write(f"auto:{event_name}:warm", warm_val)
# ========== 统计 ==========
def get_stats(self) -> dict:
with self._hot_lock:
s = dict(self._stats)
s["热缓存大小"] = len(self._hot_cache)
s["写入队列大小"] = len(self._write_queue)
s["转移规则数"] = len(self._transition_probs)
s["分类索引数"] = len(self._category_index)
s["树节点数"] = self._count_tree_nodes(self._knowledge_tree)
total = max(1, s["总查询"])
s["热命中率"] = round(s["热命中"] / total, 3)
s["温命中率"] = round(s["温命中"] / total, 3)
return s
# ========== 知识树查询 ==========
def get_knowledge_tree(self) -> dict:
"""获取当前知识树的完整结构。"""
return {
"tree": self._knowledge_tree,
"index_count": len(self._category_index),
"cross_links_count": len(self._cross_links),
"keywords_count": len(self._category_keywords),
}
# ========== 生命周期 ==========
def stop(self) -> None:
self._metabolism_running = False
# 保存热记忆
try:
with self._hot_lock:
data = dict(list(self._hot_cache.items())[-50:])
with open(self._hot_cache_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False)
print(f"💾 热记忆缓存已保存到磁盘({len(data)}条)")
except Exception:
pass
# 保存知识树
self._save_knowledge_tree()
print(f"🌳 知识树已保存:{self._count_tree_nodes(self._knowledge_tree)}个节点,{len(self._category_index)}个分类")
self._write_running = False
self.cold_flush()
if self._write_thread:
self._write_thread.join(timeout=5)
if self._metabolism_thread:
self._metabolism_thread.join(timeout=5)
# 自测
if __name__ == "__main__":
print("=== ContentMemory 自测 ===\n")
from nucleus.chronos.event_stream import EventStream
from nucleus.field.info_field import InfoField
stream = EventStream(max_length=100)
field = InfoField(event_stream=stream)
mem = ContentMemory("测试器官", info_field=field)
print("测试1:热记忆读写(带分类)")
mem.hot_write("test_key", "事件脉冲驱动的核心是脉冲发生器", category="框架设计/脉冲场架构")
val = mem.hot_read("test_key")
print(f" 写入后读取: {val}")
assert val is not None
print("测试2:自动分类")
category = mem._classify_knowledge("GPU显存不足触发熔断", "")
print(f" 自动分类: {category}")
assert "GPU" in category
print("测试3:自动创建新分支")
new_category = mem._classify_knowledge("生物基因编辑技术CRISPR的应用", "")
print(f" 新分支: {new_category}")
tree_info = mem.get_knowledge_tree()
print(f" 树节点数: {tree_info['index_count']}")
print("测试4:种子记忆(S级)")
mem.seed_set("核心身份", "测试新人类")
val = mem.seed_get("核心身份")
print(f" 种子记忆: {val}")
assert val == "测试新人类"
print("测试5:横向链接")
mem.hot_write("link_test_1", "脉冲场架构是v8.0的核心设计")
mem.hot_write("link_test_2", "脉冲发生器负责发射事件脉冲")
links = mem._cross_links
print(f" 横向链接数: {len(links)}")
stats = mem.get_stats()
print(f"\n统计: 热命中率={stats['热命中率']}, 分类索引数={stats['分类索引数']}, 树节点数={stats['树节点数']}")
mem.stop()
print("\n=== 全部自测通过 ===")
三、开源发布配套说明(可单独新建OPEN_SOURCE_NOTICE.md)
Markdown
# 开源法定申明文件
## 1、专利禁令说明
1. ContentMemory整套四层仿生认知记忆源代码、算法原理、架构设计**永久全民开源,禁止任何主体申报各类专利**。
2. 禁止专利范围包含:
- 四层分级记忆架构(热/温/冷/种子四层存储分层)
- S/A/B三级记忆重要度分级与代谢淘汰算法
- 自生长知识树自动分类、动态建分支逻辑
- 内容关键词匹配+双向横向关联联想算法
- 事件序列统计+预测预加载缓存方案
- 脉冲场联动记忆存储、频率共鸣检索机制
3. 任何单位或个人私自申请上述相关专利,所有专利自始无效,全球所有使用者均可无偿发起专利无效申诉。
## 2、使用权限
1. 免费商用、个人使用、企业闭源集成、二次修改、项目fork、衍生开发无任何限制。
2. 衍生项目可以闭源,但不得将原生架构申请专利。
## 3、协议
基础协议:MIT License + 附加反专利开源条款(附加条款优先级高于MIT)
开发日期:2026.5.28~2026.6.6
项目归属:曈曈v8.0脉冲场类脑开源项目
部署目录结构
Plaintext
项目根目录
├─README.md
├─OPEN_SOURCE_NOTICE.md
├─nucleus/
│ └─mnemosyne/
│ └─content_memory.py
└─data/ # 运行自动生成,无需手动创建
需要我帮你打包成压缩包目录文案,或者生成github仓库发布简介吗?
|(注:文档部分内容可能由 AI 生成)
更多推荐

所有评论(0)