源码级别解析 · 基于73,431⭐的智能代理框架架构剖析
2026-05-14 | 每日技术深度解读
OpenHands已成为AI驱动开发领域的标杆项目
重新定义AI在软件工程中的应用边界
模块化设计支持多种使用场景
完整的全栈AI开发解决方案
覆盖软件开发生命周期的全链路AI能力
在权威AI代码评测中表现优异
# OpenHands SDK核心架构
from openhands.core.schema import \
AgentState, Action, Observation, Result
from openhands.core.agent import BaseAgent
from openhands.core.logger import Logger
class OpenHandsSDK:
"""可组合的AI代理开发框架"""
def __init__(self, config: dict):
self.logger = Logger(config.get('log_level'))
self.state = AgentState()
self.agent = BaseAgent(config)
def create_agent(self, agent_type: str) -> BaseAgent:
"""创建不同类型的AI代理"""
if agent_type == 'code_assistant':
return CodeAssistantAgent(self.config)
elif agent_type == 'test_executor':
return TestExecutorAgent(self.config)
# ...更多代理类型
def execute_task(self, task: str) -> Result:
"""执行AI代理任务"""
self.logger.info(f"开始执行任务: {task}")
return self.agent.execute(task)
SDK架构支持插件化扩展和多代理协作
专业化代理分工协作,提升任务处理效率
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional
class AgentState(Enum):
IDLE = "idle"
PLANNING = "planning"
EXECUTING = "executing"
REVIEWING = "reviewing"
COMPLETED = "completed"
ERROR = "error"
@dataclass
class AgentContext:
"""代理执行上下文"""
current_task: str
state: AgentState
action_history: List[Action]
observations: List[Observation]
artifacts: Dict[str, any]
metrics: Dict[str, float]
class AgentStateManager:
"""代理状态管理器"""
def __init__(self):
self.contexts: Dict[str, AgentContext] = {}
def create_context(self, task_id: str, task: str) -> AgentContext:
context = AgentContext(
current_task=task,
state=AgentState.IDLE,
action_history=[],
observations=[],
artifacts={},
metrics={}
)
self.contexts[task_id] = context
return context
状态管理确保代理行为的可追溯性和一致性
原子化动作设计支持复杂任务的组合执行
class ActionExecutor:
"""动作执行引擎"""
def __init__(self, env: ExecutionEnvironment):
self.env = env
self.action_registry = ActionRegistry()
def execute_action(self, action: Action) -> Observation:
"""执行单个动作"""
action_type = action.__class__.__name__
executor = self.action_registry.get_executor(action_type)
if executor is None:
return Observation(
type="error",
content=f"未知的动作类型: {action_type}"
)
try:
result = executor.execute(action, self.env)
return Observation(
type="success",
content=result,
metadata=action.metadata
)
except Exception as e:
return Observation(
type="error",
content=str(e),
metadata=action.metadata
)
def execute_plan(self, plan: List[Action]) -> List[Observation]:
"""执行动作计划"""
observations = []
for action in plan:
observation = self.execute_action(action)
observations.append(observation)
if observation.type == "error":
break
return observations
动作执行引擎支持并行执行和错误恢复
多维度观察系统提供全面的执行反馈
class ObservationProcessor:
"""观察数据处理器"""
def __init__(self):
self.parsers = {
'file': FileObservationParser(),
'code': CodeObservationParser(),
'test': TestObservationParser(),
'error': ErrorObservationParser()
}
def process(self, observation: Observation) -> ProcessedObservation:
"""处理观察数据"""
obs_type = observation.type
parser = self.parsers.get(obs_type)
if parser is None:
# 默认处理
return self._default_processing(observation)
try:
processed = parser.parse(observation)
# 添加上下文信息
processed.context = self._extract_context(observation)
return processed
except Exception as e:
logger.error(f"观察处理失败: {e}")
return self._fallback_processing(observation)
def aggregate_observations(self, observations: List[Observation]) -> AggregatedObservation:
"""聚合多个观察结果"""
by_type = {}
for obs in observations:
if obs.type not in by_type:
by_type[obs.type] = []
by_type[obs.type].append(obs)
return AggregatedObservation(by_type)
观察处理器支持数据聚合和智能分析
多环境支持适应不同的开发场景
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
class ExecutionEnvironment(ABC):
"""执行环境抽象基类"""
@abstractmethod
def execute_command(self, command: str) -> Dict[str, Any]:
"""执行命令"""
pass
@abstractmethod
def read_file(self, path: str) -> str:
"""读取文件"""
pass
@abstractmethod
def write_file(self, path: str, content: str) -> bool:
"""写入文件"""
pass
@abstractmethod
def list_files(self, path: str) -> List[str]:
"""列出文件"""
pass
class DockerEnvironment(ExecutionEnvironment):
"""Docker执行环境"""
def __init__(self, image: str, volume_mapping: Dict[str, str]):
self.image = image
self.volume_mapping = volume_mapping
self.container_id = None
def start_container(self):
"""启动容器"""
cmd = ["docker", "run", "-d", "--rm"]
for host_path, container_path in self.volume_mapping.items():
cmd.extend(["-v", f"{host_path}:{container_path}"])
cmd.append(self.image)
result = subprocess.run(cmd, capture_output=True, text=True)
self.container_id = result.stdout.strip()
def execute_command(self, command: str) -> Dict[str, Any]:
"""在容器中执行命令"""
full_cmd = ["docker", "exec", self.container_id] + command.split()
result = subprocess.run(full_cmd, capture_output=True, text=True)
return {
"exit_code": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr
}
环境抽象层支持灵活的执行环境切换
多LLM支持满足不同场景需求
from abc import ABC, abstractmethod
from typing import List, Dict, Any
class LLMAdapter(ABC):
"""LLM适配器抽象基类"""
@abstractmethod
def chat(self, messages: List[Dict[str, str]]) -> str:
"""对话生成"""
pass
@abstractmethod
def complete(self, prompt: str, **kwargs) -> str:
"""文本补全"""
pass
@abstractmethod
def embed(self, text: str) -> List[float]:
"""文本向量化"""
pass
class OpenAIAdapter(LLMAdapter):
"""OpenAI适配器"""
def __init__(self, api_key: str, model: str = "gpt-4"):
self.client = OpenAI(api_key=api_key)
self.model = model
def chat(self, messages: List[Dict[str, str]]) -> str:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.7,
max_tokens=4000
)
return response.choices[0].message.content
def complete(self, prompt: str, **kwargs) -> str:
response = self.client.completions.create(
model=self.model,
prompt=prompt,
temperature=0.7,
max_tokens=1000
)
return response.choices[0].text.strip()
class ClaudeAdapter(LLMAdapter):
"""Anthropic Claude适配器"""
def __init__(self, api_key: str, model: str = "claude-3-sonnet-20240229"):
self.client = Anthropic(api_key=api_key)
self.model = model
def chat(self, messages: List[Dict[str, str]]) -> str:
response = self.client.messages.create(
model=self.model,
max_tokens=4000,
messages=messages
)
return response.content[0].text
适配器模式支持多LLM无缝切换
智能提示工程提升AI代理的决策质量
from jinja2 import Template
from typing import Dict, Any
class PromptTemplate:
"""提示模板类"""
def __init__(self, template_string: str):
self.template = Template(template_string)
def render(self, context: Dict[str, Any]) -> str:
"""渲染模板"""
return self.template.render(**context)
class PromptManager:
"""提示管理器"""
def __init__(self):
self.templates = {
'code_generation': self._load_code_gen_template(),
'test_generation': self._load_test_gen_template(),
'code_review': self._load_review_template(),
'debug_assist': self._load_debug_template()
}
def generate_prompt(self, task_type: str, context: Dict[str, Any]) -> str:
"""生成任务提示"""
template = self.templates.get(task_type)
if template is None:
return self._fallback_prompt(context)
# 动态添加上下文信息
enhanced_context = self._enhance_context(context)
return template.render(enhanced_context)
def _load_code_gen_template(self) -> Template:
template_str = """
任务: {{ task_description }}
项目信息:
- 技术栈: {{ tech_stack }}
- 编程语言: {{ programming_language }}
- 框架: {{ framework }}
已有代码:
```{{ programming_language }}
{{ existing_code }}
```
请根据以上信息生成高质量的代码,要求:
1. 代码风格统一
2. 遵循最佳实践
3. 添加适当注释
4. 考虑错误处理
5. 性能优化
"""
return Template(template_str)
def _enhance_context(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""增强上下文信息"""
enhanced = context.copy()
# 添加项目统计信息
enhanced['file_count'] = len(context.get('files', []))
enhanced['line_count'] = self._count_lines(context.get('code', ''))
enhanced['complexity'] = self._calculate_complexity(context.get('code', ''))
return enhanced
模板化提示生成系统支持定制化需求
记忆系统确保代理行为的连贯性和学习性
import json
import pickle
from typing import Dict, List, Any, Optional
from datetime import datetime
class MemorySystem:
"""记忆管理系统"""
def __init__(self, memory_path: str):
self.memory_path = memory_path
self.short_term_memory: List[Dict[str, Any]] = []
self.long_term_memory: List[Dict[str, Any]] = []
self.strategy_memory: List[Dict[str, Any]] = []
def add_short_term_memory(self, content: Dict[str, Any],
session_id: str, task_id: str):
"""添加短期记忆"""
memory_entry = {
'timestamp': datetime.now().isoformat(),
'session_id': session_id,
'task_id': task_id,
'content': content,
'type': 'short_term'
}
self.short_term_memory.append(memory_entry)
# 保持短期记忆大小限制
if len(self.short_term_memory) > 100:
self.short_term_memory = self.short_term_memory[-100:]
def add_long_term_memory(self, content: Dict[str, Any],
project_id: str, tags: List[str]):
"""添加长期记忆"""
memory_entry = {
'timestamp': datetime.now().isoformat(),
'project_id': project_id,
'tags': tags,
'content': content,
'type': 'long_term'
}
self.long_term_memory.append(memory_entry)
def add_strategy_memory(self, strategy: Dict[str, Any],
success_rate: float, project_id: str):
"""添加策略记忆"""
strategy_entry = {
'timestamp': datetime.now().isoformat(),
'project_id': project_id,
'strategy': strategy,
'success_rate': success_rate,
'type': 'strategy'
}
self.strategy_memory.append(strategy_entry)
def search_memory(self, query: str, memory_type: Optional[str] = None,
limit: int = 10) -> List[Dict[str, Any]]:
"""搜索记忆"""
results = []
memory_pools = []
if memory_type is None or memory_type == 'short_term':
memory_pools.extend(self.short_term_memory)
if memory_type is None or memory_type == 'long_term':
memory_pools.extend(self.long_term_memory)
if memory_type is None or memory_type == 'strategy':
memory_pools.extend(self.strategy_memory)
# 简单的文本匹配搜索
for memory in memory_pools:
if self._is_match(query, memory['content']):
results.append(memory)
return results[:limit]
def get_relevant_context(self, task: str, project_id: str) -> Dict[str, Any]:
"""获取相关上下文"""
context = {}
# 获取相关长期记忆
long_term_memories = self.search_memory(
task, 'long_term', limit=5
)
context['long_term'] = long_term_memories
# 获取相关策略
strategies = self.search_memory(
task, 'strategy', limit=3
)
context['strategies'] = strategies
# 获取当前任务的短期记忆
context['short_term'] = self.short_term_memory[-10:]
return context
记忆系统支持多维度信息检索和上下文构建
多维度评估系统确保AI代理的质量和可靠性
import json
import subprocess
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class TestStatus(Enum):
PENDING = "pending"
RUNNING = "running"
PASSED = "passed"
FAILED = "failed"
ERROR = "error"
@dataclass
class TestCase:
"""测试用例"""
id: str
name: str
description: str
input_data: Dict[str, Any]
expected_output: Any
actual_output: Optional[Any] = None
status: TestStatus = TestStatus.PENDING
execution_time: Optional[float] = None
error_message: Optional[str] = None
class EvaluationEngine:
"""评测引擎"""
def __init__(self, test_suites: Dict[str, List[TestCase]]):
self.test_suites = test_suites
self.results = {}
def run_test_suite(self, suite_name: str) -> Dict[str, Any]:
"""运行测试套件"""
if suite_name not in self.test_suites:
return {"error": f"测试套件不存在: {suite_name}"}
test_cases = self.test_suites[suite_name]
results = {
"suite_name": suite_name,
"total_tests": len(test_cases),
"passed": 0,
"failed": 0,
"errors": 0,
"execution_time": 0,
"details": []
}
for test_case in test_cases:
try:
result = self._run_single_test(test_case)
results["execution_time"] += result["execution_time"]
if result["status"] == TestStatus.PASSED:
results["passed"] += 1
elif result["status"] == TestStatus.FAILED:
results["failed"] += 1
else:
results["errors"] += 1
results["details"].append(result)
except Exception as e:
results["errors"] += 1
results["details"].append({
"test_id": test_case.id,
"status": TestStatus.ERROR,
"error": str(e)
})
self.results[suite_name] = results
return results
def run_swebench_evaluation(self, model: str) -> Dict[str, Any]:
"""运行SWEBench评测"""
cmd = ["python", "swebench_evaluator.py", "--model", model]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=3600)
if result.returncode == 0:
return json.loads(result.stdout)
else:
return {
"error": "SWEBench评测失败",
"stderr": result.stderr
}
except subprocess.TimeoutExpired:
return {"error": "SWEBench评测超时"}
except Exception as e:
return {"error": f"SWEBench评测异常: {e}"}
评测引擎支持自动化测试和性能分析
CLI提供灵活的命令行操作体验
import click
from typing import Optional, List
from openhands.core.agent import OpenHandsAgent
from openhands.core.environment import ExecutionEnvironment
class CLI:
"""OpenHands CLI主类"""
def __init__(self):
self.agent = None
self.environment = None
@click.group()
def cli():
"""OpenHands AI开发助手"""
pass
@cli.command()
@click.option('--model', default='gpt-4', help='使用的LLM模型')
@click.option('--env', default='local', help='执行环境')
@click.argument('task', type=str)
def run(model: str, env: str, task: str):
"""运行AI代理任务"""
click.echo(f"开始执行任务: {task}")
click.echo(f"使用模型: {model}")
click.echo(f"执行环境: {env}")
# 初始化环境
environment = ExecutionEnvironment.create(env)
# 初始化代理
agent = OpenHandsAgent(
model=model,
environment=environment
)
try:
# 执行任务
result = agent.execute(task)
if result.success:
click.echo("✅ 任务执行成功")
if result.output:
click.echo(f"输出:\n{result.output}")
else:
click.echo(f"❌ 任务执行失败: {result.error}")
except Exception as e:
click.echo(f"💥 执行异常: {e}")
@cli.command()
@click.option('--project', required=True, help='项目路径')
@click.option('--output', help='输出路径')
def review(project: str, output: Optional[str]):
"""代码审查"""
click.echo(f"开始审查项目: {project}")
# 创建审查代理
agent = CodeReviewAgent(project_path=project)
# 执行审查
issues = agent.review()
# 输出结果
if output:
agent.generate_report(issues, output)
click.echo(f"审查报告已生成: {output}")
else:
for issue in issues:
click.echo(f"🔍 {issue.description}")
click.echo(f" 位置: {issue.location}")
click.echo(f" 严重程度: {issue.severity}")
click.echo()
CLI模块支持完整的命令行操作功能
GUI提供直观的交互体验和可视化操作
云服务提供高可用性和可扩展性
企业版满足大型组织的安全和管理需求
from abc import ABC, abstractmethod
from typing import Dict, Any, List
import importlib
import inspect
class Plugin(ABC):
"""插件抽象基类"""
@property
@abstractmethod
def name(self) -> str:
"""插件名称"""
pass
@property
@abstractmethod
def version(self) -> str:
"""插件版本"""
pass
@property
@abstractmethod
def description(self) -> str:
"""插件描述"""
pass
@abstractmethod
def initialize(self, config: Dict[str, Any]) -> bool:
"""初始化插件"""
pass
@abstractmethod
def execute(self, task: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""执行插件功能"""
pass
@abstractmethod
def cleanup(self) -> bool:
"""清理插件资源"""
pass
class PluginManager:
"""插件管理器"""
def __init__(self):
self.plugins: Dict[str, Plugin] = {}
self.plugin_configs: Dict[str, Dict[str, Any]] = {}
def register_plugin(self, plugin: Plugin, config: Dict[str, Any] = None):
"""注册插件"""
if plugin.name in self.plugins:
raise ValueError(f"插件已存在: {plugin.name}")
if config is None:
config = {}
self.plugins[plugin.name] = plugin
self.plugin_configs[plugin.name] = config
# 初始化插件
if plugin.initialize(config):
print(f"插件 {plugin.name} 注册成功")
else:
print(f"插件 {plugin.name} 注册失败")
del self.plugins[plugin.name]
del self.plugin_configs[plugin.name]
def unregister_plugin(self, plugin_name: str):
"""注销插件"""
if plugin_name in self.plugins:
plugin = self.plugins[plugin_name]
# 清理插件
if plugin.cleanup():
del self.plugins[plugin_name]
del self.plugin_configs[plugin_name]
print(f"插件 {plugin_name} 注销成功")
else:
print(f"插件 {plugin_name} 清理失败")
def execute_plugin(self, plugin_name: str, task: str,
context: Dict[str, Any]) -> Dict[str, Any]:
"""执行插件"""
if plugin_name not in self.plugins:
return {"error": f"插件不存在: {plugin_name}"}
plugin = self.plugins[plugin_name]
return plugin.execute(task, context)
def load_plugins_from_directory(self, directory: str):
"""从目录加载插件"""
import os
for filename in os.listdir(directory):
if filename.endswith('.py') and not filename.startswith('_'):
module_name = filename[:-3]
try:
module = importlib.import_module(f"{directory}.{module_name}")
# 查找Plugin子类
for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
issubclass(obj, Plugin) and
obj != Plugin):
# 实例化插件
plugin_instance = obj()
config = self.plugin_configs.get(plugin_instance.name, {})
self.register_plugin(plugin_instance, config)
except Exception as e:
print(f"加载插件 {filename} 失败: {e}")
插件系统支持功能扩展和定制化开发
多部署方案适应不同的基础设施需求
# Dockerfile for OpenHands
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
git \
nodejs \
npm \
curl \
&& rm -rf /var/lib/apt/lists/*
# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 安装前端依赖
WORKDIR /app/frontend
RUN npm install
RUN npm run build
# 创建用户
RUN useradd -m -u 1000 openhands
USER openhands
# 暴露端口
EXPOSE 3000 8000
# 启动命令
CMD ["python", "/app/openhands_server.py"]
Docker容器化部署简化环境配置
多层级配置支持不同场景的定制需求
import os
import json
import yaml
from typing import Dict, Any, Optional
from dataclasses import dataclass, asdict
class ConfigManager:
"""配置管理器"""
def __init__(self, config_path: str = None):
self.config_path = config_path or "openhands.yaml"
self.config = self._load_default_config()
def _load_default_config(self) -> Dict[str, Any]:
"""加载默认配置"""
return {
"model": {
"provider": "openai",
"model": "gpt-4",
"temperature": 0.7,
"max_tokens": 4000
},
"environment": {
"type": "local",
"workspace": "/tmp/workspace"
},
"logging": {
"level": "INFO",
"file": "openhands.log",
"max_size": "10MB",
"backup_count": 5
},
"plugins": {
"enabled": ["code_generation", "test_execution"],
"disabled": []
},
"security": {
"allow_file_operations": True,
"max_file_size": "100MB",
"allowed_extensions": [".py", ".js", ".java", ".cpp", ".md"]
}
}
def load_config(self) -> Dict[str, Any]:
"""加载配置文件"""
if os.path.exists(self.config_path):
with open(self.config_path, 'r', encoding='utf-8') as f:
if self.config_path.endswith('.json'):
user_config = json.load(f)
elif self.config_path.endswith('.yaml') or self.config_path.endswith('.yml'):
user_config = yaml.safe_load(f)
else:
raise ValueError("不支持的配置文件格式")
# 合并配置
self.config = self._merge_configs(self.config, user_config)
# 从环境变量加载配置
self._load_env_config()
return self.config
def save_config(self) -> bool:
"""保存配置文件"""
try:
with open(self.config_path, 'w', encoding='utf-8') as f:
if self.config_path.endswith('.json'):
json.dump(self.config, f, indent=2, ensure_ascii=False)
elif self.config_path.endswith('.yaml') or self.config_path.endswith('.yml'):
yaml.dump(self.config, f, default_flow_style=False, allow_unicode=True)
else:
raise ValueError("不支持的配置文件格式")
return True
except Exception as e:
print(f"保存配置文件失败: {e}")
return False
def get(self, key: str, default: Any = None) -> Any:
"""获取配置值"""
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
def set(self, key: str, value: Any) -> bool:
"""设置配置值"""
keys = key.split('.')
config = self.config
# 导航到父级
for k in keys[:-1]:
if k not in config:
config[k] = {}
config = config[k]
# 设置值
config[keys[-1]] = value
return True
配置管理器支持多源配置和动态更新
完善的日志系统支持问题诊断和性能优化
import logging
import logging.handlers
import json
from datetime import datetime
from typing import Dict, Any
class StructuredLogger:
"""结构化日志记录器"""
def __init__(self, name: str, level: str = "INFO",
log_file: str = None):
self.logger = logging.getLogger(name)
self.logger.setLevel(getattr(logging, level.upper()))
# 清除已有的处理器
self.logger.handlers.clear()
# 创建格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# 文件处理器
if log_file:
# 使用RotatingFileHandler避免日志文件过大
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def log_structured(self, level: str, event: str,
data: Dict[str, Any] = None):
"""记录结构化日志"""
log_data = {
'timestamp': datetime.now().isoformat(),
'level': level,
'event': event,
'data': data or {}
}
log_message = json.dumps(log_data, ensure_ascii=False)
if level == 'DEBUG':
self.logger.debug(log_message)
elif level == 'INFO':
self.logger.info(log_message)
elif level == 'WARNING':
self.logger.warning(log_message)
elif level == 'ERROR':
self.logger.error(log_message)
elif level == 'CRITICAL':
self.logger.critical(log_message)
def log_agent_action(self, agent_id: str, action: str,
result: str, duration: float):
"""记录代理操作"""
self.log_structured(
'INFO',
'agent_action',
{
'agent_id': agent_id,
'action': action,
'result': result,
'duration': duration
}
)
def log_performance(self, operation: str, duration: float,
success: bool, details: Dict[str, Any] = None):
"""记录性能指标"""
self.log_structured(
'INFO',
'performance',
{
'operation': operation,
'duration': duration,
'success': success,
'details': details or {}
}
)
def log_error(self, error_type: str, error_message: str,
context: Dict[str, Any] = None):
"""记录错误信息"""
self.log_structured(
'ERROR',
'error',
{
'error_type': error_type,
'error_message': error_message,
'context': context or {}
}
)
结构化日志系统支持可查询的运行监控
全面的监控系统确保系统的稳定运行
import time
import psutil
import threading
from typing import Dict, List, Any
from dataclasses import dataclass
from datetime import datetime
class SystemMetrics:
"""系统指标收集器"""
def __init__(self):
self.metrics_history: List[Dict[str, Any]] = []
self.is_collecting = False
self.collection_thread = None
def start_collection(self, interval: float = 5.0):
"""开始收集指标"""
if self.is_collecting:
return
self.is_collecting = True
self.collection_thread = threading.Thread(
target=self._collect_metrics_loop,
args=(interval,)
)
self.collection_thread.daemon = True
self.collection_thread.start()
def stop_collection(self):
"""停止收集指标"""
self.is_collecting = False
if self.collection_thread:
self.collection_thread.join()
def _collect_metrics_loop(self, interval: float):
"""收集指标循环"""
while self.is_collecting:
try:
metrics = self._collect_current_metrics()
self.metrics_history.append(metrics)
# 保持历史大小限制
if len(self.metrics_history) > 1000:
self.metrics_history = self.metrics_history[-1000:]
except Exception as e:
print(f"收集指标失败: {e}")
time.sleep(interval)
def _collect_current_metrics(self) -> Dict[str, Any]:
"""收集当前指标"""
cpu_percent = psutil.cpu_percent()
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
'timestamp': datetime.now().isoformat(),
'cpu': {
'percent': cpu_percent,
'count': psutil.cpu_count(),
'load_avg': os.getloadavg() if hasattr(os, 'getloadavg') else None
},
'memory': {
'total': memory.total,
'available': memory.available,
'percent': memory.percent,
'used': memory.used
},
'disk': {
'total': disk.total,
'used': disk.used,
'free': disk.free,
'percent': disk.percent
},
'network': self._get_network_metrics(),
'processes': len(psutil.pids())
}
def _get_network_metrics(self) -> Dict[str, Any]:
"""获取网络指标"""
net_io = psutil.net_io_counters()
return {
'bytes_sent': net_io.bytes_sent,
'bytes_recv': net_io.bytes_recv,
'packets_sent': net_io.packets_sent,
'packets_recv': net_io.packets_recv
}
def get_metrics_summary(self, hours: int = 24) -> Dict[str, Any]:
"""获取指标摘要"""
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_metrics = [
m for m in self.metrics_history
if datetime.fromisoformat(m['timestamp']) > cutoff_time
]
if not recent_metrics:
return {"error": "无可用指标数据"}
# 计算平均值
avg_cpu = sum(m['cpu']['percent'] for m in recent_metrics) / len(recent_metrics)
avg_memory = sum(m['memory']['percent'] for m in recent_metrics) / len(recent_metrics)
return {
'period_hours': hours,
'data_points': len(recent_metrics),
'avg_cpu': avg_cpu,
'avg_memory': avg_memory,
'max_cpu': max(m['cpu']['percent'] for m in recent_metrics),
'max_memory': max(m['memory']['percent'] for m in recent_metrics),
'min_cpu': min(m['cpu']['percent'] for m in recent_metrics),
'min_memory': min(m['memory']['percent'] for m in recent_metrics)
}
监控系统实时收集和分析系统运行指标
多层次安全机制保障系统安全
import os
import subprocess
import tempfile
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from pathlib import Path
class SecurityManager:
"""安全管理器"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.allowed_extensions = config.get('allowed_extensions', [])
self.max_file_size = config.get('max_file_size', '100MB')
self.sandbox_enabled = config.get('sandbox_enabled', True)
def validate_file(self, file_path: str) -> bool:
"""验证文件安全性"""
try:
path = Path(file_path)
# 检查文件扩展名
if path.suffix not in self.allowed_extensions:
return False
# 检查文件大小
file_size = path.stat().st_size
max_size = self._parse_size(self.max_file_size)
if file_size > max_size:
return False
# 检查文件内容(简单检查)
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read(1000) # 只读取前1KB
if self._contains_malicious_content(content):
return False
return True
except Exception as e:
print(f"文件验证失败: {e}")
return False
def execute_in_sandbox(self, command: str,
timeout: int = 30) -> Dict[str, Any]:
"""在沙箱中执行命令"""
if not self.sandbox_enabled:
return self._execute_command_directly(command, timeout)
try:
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
# 限制在沙箱目录中
sandbox_command = f"cd {temp_dir} && {command}"
# 限制资源使用
result = subprocess.run(
sandbox_command,
shell=True,
timeout=timeout,
capture_output=True,
text=True,
cwd=temp_dir
)
return {
'success': True,
'exit_code': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr,
'timed_out': False
}
except subprocess.TimeoutExpired:
return {
'success': False,
'exit_code': -1,
'stdout': '',
'stderr': '命令执行超时',
'timed_out': True
}
except Exception as e:
return {
'success': False,
'exit_code': -1,
'stdout': '',
'stderr': str(e),
'timed_out': False
}
def _parse_size(self, size_str: str) -> int:
"""解析文件大小字符串"""
size_str = size_str.upper()
if size_str.endswith('KB'):
return int(size_str[:-2]) * 1024
elif size_str.endswith('MB'):
return int(size_str[:-2]) * 1024 * 1024
elif size_str.endswith('GB'):
return int(size_str[:-2]) * 1024 * 1024 * 1024
else:
return int(size_str)
def _contains_malicious_content(self, content: str) -> bool:
"""检查是否包含恶意内容"""
malicious_patterns = [
'import os.system',
'eval(',
'exec(',
'subprocess.run',
'subprocess.Popen',
'os.popen',
'__import__('
]
content_lower = content.lower()
return any(pattern in content_lower for pattern in malicious_patterns)
安全机制确保代码执行的安全性和隔离性
性能优化确保系统的高效运行
import time
import pickle
import hashlib
from typing import Dict, Any, Optional, Callable
from functools import wraps
from pathlib import Path
import threading
class CacheManager:
"""缓存管理器"""
def __init__(self, cache_dir: str = "/tmp/openhands_cache",
max_size: int = 1024*1024*1024): # 1GB
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.max_size = max_size
self.cache_stats = {
'hits': 0,
'misses': 0,
'evictions': 0
}
self.lock = threading.RLock()
def get_cache_key(self, func: Callable, *args, **kwargs) -> str:
"""生成缓存键"""
# 组合函数名和参数
key_data = {
'function': func.__name__,
'args': args,
'kwargs': kwargs
}
key_str = str(key_data)
return hashlib.md5(key_str.encode()).hexdigest()
def get(self, cache_key: str) -> Optional[Any]:
"""获取缓存"""
cache_file = self.cache_dir / f"{cache_key}.pkl"
if not cache_file.exists():
return None
try:
with self.lock:
# 检查缓存是否过期
file_mtime = cache_file.stat().st_mtime
if time.time() - file_mtime > 3600: # 1小时过期
cache_file.unlink()
return None
# 读取缓存
with open(cache_file, 'rb') as f:
data = pickle.load(f)
self.cache_stats['hits'] += 1
return data
except Exception as e:
print(f"读取缓存失败: {e}")
return None
def set(self, cache_key: str, data: Any, ttl: int = 3600):
"""设置缓存"""
cache_file = self.cache_dir / f"{cache_key}.pkl"
try:
with self.lock:
# 检查缓存大小
if self._check_cache_size():
self._evict_old_cache()
# 写入缓存
with open(cache_file, 'wb') as f:
pickle.dump(data, f)
# 设置文件过期时间
os.utime(cache_file, (time.time() + ttl, time.time() + ttl))
except Exception as e:
print(f"写入缓存失败: {e}")
def _check_cache_size(self) -> bool:
"""检查缓存大小"""
total_size = sum(f.stat().st_size for f in self.cache_dir.glob("*.pkl"))
return total_size >= self.max_size
def _evict_old_cache(self):
"""清理过期缓存"""
cache_files = list(self.cache_dir.glob("*.pkl"))
# 按修改时间排序,删除最老的文件
cache_files.sort(key=lambda f: f.stat().st_mtime)
files_to_remove = len(cache_files) // 4 # 删除25%的文件
for i in range(files_to_remove):
try:
cache_files[i].unlink()
self.cache_stats['evictions'] += 1
except Exception as e:
print(f"删除缓存文件失败: {e}")
def cache_result(ttl: int = 3600):
"""缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_manager = CacheManager()
cache_key = cache_manager.get_cache_key(func, *args, **kwargs)
# 尝试从缓存获取
cached_result = cache_manager.get(cache_key)
if cached_result is not None:
return cached_result
# 执行函数
result = func(*args, **kwargs)
# 存入缓存
cache_manager.set(cache_key, result, ttl)
return result
return wrapper
return decorator
缓存系统显著提升重复任务的执行效率
活跃的社区生态系统推动持续创新
清晰的路线图指引项目发展方向
广泛的应用场景覆盖各类开发需求
OpenHands正在重新定义AI与软件开发的关系
感谢阅读!
访问 https://atcfu.com/ai-articles/openhands-ai-driven-development/ 回顾本文