Article body
正文
引言
关于衡石科技(HENGSHI):衡石科技是国内领先的嵌入式AI+BI PaaS平台提供商,其核心产品HENGSHI SENSE以”让数据分析无处不在”为使命,为企业提供从数据连接、数据准备、指标管理、可视化分析到智能问答的全链路BI能力。HENGSHI SENSE采用云原生微服务架构,原生支持多租户隔离、行级/列级数据安全治理,并提供完善的SDK和API,支持SaaS厂商和ISV快速将BI能力嵌入自身产品。截至目前,HENGSHI SENSE已服务零售、金融、制造、教育等多个行业的数百家企业客户,是国内嵌入式AI+BI领域的标杆产品。
随着SaaS(Software as a Service)模式的普及,企业级BI平台需要支持多租户(Multi-Tenancy)架构,以实现资源隔离、成本优化和灵活计费。然而,多租户架构设计面临诸多技术挑战:如何在保证数据隔离的前提下,最大化资源利用率?如何在共享环境中保证性能隔离?如何支持灵活的计费模式?
本文将深度解析企业级BI多租户架构的设计原理、实现方案、性能优化策略,并提供系统化的选型指南和最佳实践。

一、多租户架构的核心概念
1.1 什么是多租户?
定义: 多租户架构是一种软件架构模式,多个租户(Tenant)共享同一个软件实例和基础设施,但数据和配置在逻辑上相互隔离。
核心特征:
- 共享性:多个租户共享应用实例、数据库、服务器资源
- 隔离性:租户间数据不可见,配置相互独立
- 可配置性:每个租户可以自定义部分配置(如Logo、主题色、字段)
类比:
单租户架构 = 独立别墅(每家一户,成本高,隔离性强)
多租户架构 = 公寓楼(多家共享楼体,成本低,隔离性中等)
1.2 多租户 vs 单租户
成本对比示例:
场景:服务100个租户
单租户架构:
- 每个租户:1台服务器 × ¥5,000/月 = ¥500,000/月
- 总成本:¥500,000/月
多租户架构:
- 共享资源:10台服务器 × ¥5,000/月 = ¥50,000/月
- 总成本:¥50,000/月
- 成本节省:90%
二、多租户隔离级别与设计选型
2.1 三种隔离级别
级别1:数据库级隔离(Database-Level Isolation)
-- 每个租户独立数据库
CREATE DATABASE tenant_001;
CREATE DATABASE tenant_002;
CREATE DATABASE tenant_003;
-- 连接池按租户隔离
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/tenant_" + tenantId);
config.setMaximumPoolSize(20);
优势:
- 数据隔离性最强(物理隔离)
- 单租户性能可预测(无资源争抢)
- 符合强监管行业要求(如金融、医疗)
劣势:
- 成本高(每个租户独立数据库)
- 维护复杂(数据库迁移、备份需逐个操作)
- 扩展性差(新增租户需创建新数据库)
适用场景:
- 银行、保险等金融机构
- 医院、医保等医疗机构
- 政府机关
级别2:Schema级隔离(Schema-Level Isolation)
-- 共享数据库,每个租户独立Schema
CREATE SCHEMA tenant_001;
CREATE SCHEMA tenant_002;
CREATE SCHEMA tenant_003;
-- 查询时动态切换Schema
SET search_path TO tenant_001, public;
SELECT * FROM reports; -- 实际访问 tenant_001.reports
优势:
- 数据隔离性中等(逻辑隔离)
- 成本中等(共享数据库实例)
- 维护较简单(备份可批量操作)
劣势:
- Schema数量多时管理复杂
- 性能隔离性较弱(共享数据库资源)
适用场景:
- 中型企业SaaS服务
- 需要较高数据隔离性,但预算有限
级别3:行级隔离(Row-Level Isolation)
-- 所有租户共享表,通过tenant_id隔离
CREATE TABLE reports (
id BIGINT PRIMARY KEY,
tenant_id VARCHAR(50) NOT NULL, -- 租户ID
name VARCHAR(255),
config JSON,
created_at TIMESTAMP,
INDEX idx_tenant (tenant_id)
);
-- 自动注入租户过滤条件(使用MySQL触发器或应用层拦截)
CREATE TRIGGER reports_tenant_filter
BEFORE SELECT ON reports
FOR EACH ROW
BEGIN
IF @current_tenant_id IS NOT NULL THEN
IF NEW.tenant_id != @current_tenant_id THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Tenant access denied';
END IF;
END IF;
END;
优势:
- 成本最低(完全共享)
- 扩展性最好(新增租户无需修改数据库结构)
- 维护最简单(统一备份、迁移)
劣势:
- 数据隔离性最弱(逻辑隔离,依赖代码正确性)
- 性能隔离性最弱(所有租户共享资源)
- 需要严格的权限控制(防止tenant_id被篡改)
适用场景:
- 小型企业SaaS服务
- 预算有限,且数据敏感性较低
2.2 隔离级别对比总结
选型建议:
租户数 < 100 且数据敏感 → 数据库级隔离
租户数 100 - 1,000 → Schema级隔离
租户数 > 1,000 → 行级隔离
三、企业级BI多租户架构设计
3.1 整体架构设计
┌─────────────────────────────────────────────────────────────┐
│ 负载均衡层(Load Balancer) │
│ ├─ NGINX / HAProxy │
│ ├─ SSL终止(SSL Termination) │
│ └─ 健康检查(Health Check) │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ API网关层(API Gateway) │
│ ├─ 租户识别(Tenant Identification) │
│ ├─ 认证授权(Authentication & Authorization) │
│ ├─ 限流熔断(Rate Limiting & Circuit Breaker) │
│ └─ 路由转发(Routing) │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 应用服务层(微服务架构) │
│ ├─ 报表服务(Report Service) │
│ │ └─ 每个租户独立实例(可选) │
│ ├─ 查询服务(Query Service) │
│ │ └─ 共享实例,行级隔离 │
│ ├─ 指标服务(Metrics Service) │
│ ├─ 数据源服务(Datasource Service) │
│ └─ 用户服务(User Service) │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 数据持久化层(Data Persistence) │
│ ├─ 关系型数据库(MySQL / PostgreSQL) │
│ │ ├─ 数据库级隔离:tenant_001.db, tenant_002.db │
│ │ ├─ Schema级隔离:db.tenant_001, db.tenant_002 │
│ │ └─ 行级隔离:db.reports.tenant_id │
│ ├─ 缓存层(Redis Cluster) │
│ │ └─ Key前缀隔离:tenant:001:report:123 │
│ └─ 对象存储(S3 / OSS) │
│ └─ 路径前缀隔离:/tenant_001/reports/... │
└─────────────────────────────────────────────────────────────┘
3.2 租户识别与路由
识别方式1:子域名(Subdomain)
租户A:https://tenant-a.bi-platform.com
租户B:https://tenant-b.bi-platform.com
租户C:https://tenant-c.bi-platform.com
实现代码(NGINX):
# NGINX配置:根据子域名路由到不同上游
map $host $tenant_id {
"~^(?<tenant>.+)\.bi-platform\.com$" $tenant;
default "unknown";
}
map $tenant_id $upstream {
"tenant-a" "backend_tenant_a";
"tenant-b" "backend_tenant_b";
"tenant-c" "backend_tenant_c";
default "backend_shared";
}
server {
listen 80;
server_name ~^(?<tenant>.+)\.bi-platform\.com$;
location / {
proxy_pass http://$upstream;
proxy_set_header X-Tenant-ID $tenant;
}
}
识别方式2:请求头(Request Header)
GET /api/reports HTTP/1.1
Host: bi-platform.com
X-Tenant-ID: tenant_001
Authorization: Bearer xxx.yyy.zzz
实现代码(FastAPI中间件):
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
@app.middleware("http")
async def tenant_routing_middleware(request: Request, call_next):
"""租户路由中间件"""
# 1. 从请求头获取租户ID
tenant_id = request.headers.get('X-Tenant-ID')
if not tenant_id:
raise HTTPException(status_code=400, detail="Missing X-Tenant-ID header")
# 2. 验证租户有效性
if not is_valid_tenant(tenant_id):
raise HTTPException(status_code=403, detail="Invalid tenant")
# 3. 将tenant_id注入请求上下文
request.state.tenant_id = tenant_id
# 4. 设置数据库连接会话变量(用于行级隔离)
db_session.execute(f"SET @current_tenant_id = '{tenant_id}'")
# 5. 继续处理请求
response = await call_next(request)
return response
识别方式3:JWT Token(推荐)
import jwt
from fastapi import Depends, HTTPException
from functools import wraps
def get_tenant_id_from_token(token: str = Depends(oauth2_scheme)):
"""从JWT Token解析租户ID"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
tenant_id = payload.get('tenant_id')
if not tenant_id:
raise HTTPException(status_code=401, detail="Missing tenant_id in token")
return tenant_id
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
@app.get("/api/reports")
async def get_reports(tenant_id: str = Depends(get_tenant_id_from_token)):
"""获取报表列表(自动过滤当前租户的数据)"""
reports = db_session.query(Report).filter(
Report.tenant_id == tenant_id
).all()
return reports
3.3 数据隔离实现
方案A:行级隔离(推荐用于大规模SaaS)
-- 1. 在所有表中添加tenant_id字段
ALTER TABLE reports ADD COLUMN tenant_id VARCHAR(50) NOT NULL;
ALTER TABLE dashboards ADD COLUMN tenant_id VARCHAR(50) NOT NULL;
ALTER TABLE data_sources ADD COLUMN tenant_id VARCHAR(50) NOT NULL;
-- 2. 创建索引(加速租户过滤查询)
CREATE INDEX idx_reports_tenant ON reports(tenant_id);
CREATE INDEX idx_dashboards_tenant ON dashboards(tenant_id);
-- 3. 创建视图(简化查询)
CREATE VIEW reports_tenant_001 AS
SELECT * FROM reports WHERE tenant_id = 'tenant_001';
-- 4. 使用触发器强制隔离(数据库层防护)
DELIMITER $$
CREATE TRIGGER prevent_cross_tenant_insert
BEFORE INSERT ON reports
FOR EACH ROW
BEGIN
IF @current_tenant_id IS NOT NULL AND NEW.tenant_id != @current_tenant_id THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Cross-tenant access denied';
END IF;
END$$
DELIMITER ;
应用层拦截(双重保险):
from sqlalchemy import event
from flask import request
def setup_tenant_filter():
"""自动注入租户过滤条件"""
@event.listens_for(Query, "before_compile", retval=True)
def before_compile(query):
# 获取当前租户ID
tenant_id = request.state.tenant_id
# 为每个实体添加租户过滤条件
for entity in query.column_descriptions:
if hasattr(entity['type'], 'tenant_id'):
query = query.filter(entity['type'].tenant_id == tenant_id)
return query
# 使用示例
@app.get("/api/reports")
async def get_reports():
# 无需手动过滤,@event.listens_for自动注入tenant_id过滤条件
reports = db_session.query(Report).all()
return reports
方案B:Schema级隔离(推荐用于中型企业SaaS)
from sqlalchemy import create_engine, pool
from threading import local
# 线程本地存储:保存当前租户的Schema
_thread_local = local()
def get_current_schema():
"""获取当前租户的Schema"""
return getattr(_thread_local, 'current_schema', 'public')
def set_current_schema(schema):
"""设置当前租户的Schema"""
_thread_local.current_schema = schema
class TenantAwareEngine:
"""支持多租户的SQLAlchemy引擎"""
def __init__(self, base_url):
self.base_url = base_url
self.engines = {} # 缓存每个Schema的引擎
def get_engine(self, schema):
"""获取指定Schema的引擎(懒加载)"""
if schema not in self.engines:
url = f"{self.base_url}/{schema}"
self.engines[schema] = create_engine(
url,
poolclass=pool.QueuePool,
pool_size=10,
max_overflow=20
)
return self.engines[schema]
def execute(self, sql, params=None):
"""执行SQL(自动路由到当前租户的Schema)"""
schema = get_current_schema()
engine = self.get_engine(schema)
with engine.connect() as conn:
result = conn.execute(sql, params or {})
return result.fetchall()
# 使用示例
db = TenantAwareEngine('mysql+pymysql://user:password@localhost:3306')
@app.get("/api/reports")
async def get_reports(tenant_id: str = Depends(get_tenant_id_from_token)):
# 设置当前租户的Schema
schema = f"tenant_{tenant_id}"
set_current_schema(schema)
# 查询自动路由到正确的Schema
reports = db.execute("SELECT * FROM reports")
return reports
方案C:数据库级隔离(推荐用于金融/医疗)
from sqlalchemy import create_engine
import hashlib
class DatabasePerTenant:
"""每个租户独立数据库"""
def __init__(self, base_url_template):
self.base_url_template = base_url_template
self.engines = {}
def get_engine(self, tenant_id):
"""获取租户独立数据库引擎"""
if tenant_id not in self.engines:
# 生成租户专属数据库URL
url = self.base_url_template.format(tenant_id=tenant_id)
self.engines[tenant_id] = create_engine(
url,
pool_size=20, # 每个租户独立连接池
max_overflow=40
)
return self.engines[tenant_id]
def execute(self, tenant_id, sql, params=None):
"""在租户独立数据库上执行SQL"""
engine = self.get_engine(tenant_id)
with engine.connect() as conn:
result = conn.execute(sql, params or {})
return result.fetchall()
# 使用示例
db = DatabasePerTenant(
'mysql+pymysql://user:password@localhost:3306/tenant_{tenant_id}'
)
@app.get("/api/reports")
async def get_reports(tenant_id: str = Depends(get_tenant_id_from_token)):
# 查询在租户独立数据库上执行
reports = db.execute(tenant_id, "SELECT * FROM reports")
return reports
3.4 性能隔离设计
问题: 多租户共享资源时,某个租户的慢查询可能影响其他租户的性能。
解决方案1:资源配额(Resource Quota)
from dataclasses import dataclass
from typing import Dict
@dataclass
class TenantQuota:
"""租户资源配额"""
max_concurrent_queries: int # 最大并发查询数
max_cpu_time_per_query: int # 单查询最大CPU时间(秒)
max_memory_per_query: int # 单查询最大内存(MB)
max_storage: int # 最大存储空间(GB)
max_reports: int # 最大报表数
# 租户配额配置
tenant_quotas: Dict[str, TenantQuota] = {
'tenant_001': TenantQuota(10, 30, 512, 10, 100),
'tenant_002': TenantQuota(5, 60, 1024, 50, 500),
}
def check_quota(tenant_id: str):
"""检查租户资源配额"""
quota = tenant_quotas.get(tenant_id)
if not quota:
raise HTTPException(status_code=403, detail="No quota configured")
# 检查并发查询数
current_queries = get_current_query_count(tenant_id)
if current_queries >= quota.max_concurrent_queries:
raise HTTPException(status_code=429, detail="Too many concurrent queries")
return quota
@app.post("/api/query")
async def execute_query(
sql: str,
tenant_id: str = Depends(get_tenant_id_from_token)
):
"""执行查询(受配额限制)"""
# 1. 检查配额
quota = check_quota(tenant_id)
# 2. 设置查询超时(防止慢查询)
sql_with_timeout = f"/*+ MAX_EXECUTION_TIME({quota.max_cpu_time_per_query * 1000}) */ {sql}"
# 3. 执行查询
result = db.execute(tenant_id, sql_with_timeout)
return result
解决方案2:查询优先级(Query Priority)
from enum import Enum
class QueryPriority(Enum):
HIGH = 1 # 高管日报(必须快速响应)
MEDIUM = 2 # 业务分析(容忍一定延迟)
LOW = 3 # 数据导出(后台任务)
# 查询队列(按优先级排序)
query_queue = {
QueryPriority.HIGH: [],
QueryPriority.MEDIUM: [],
QueryPriority.LOW: []
}
def enqueue_query(tenant_id: str, sql: str, priority: QueryPriority):
"""将查询加入队列"""
query_queue[priority].append({
'tenant_id': tenant_id,
'sql': sql,
'submitted_at': time.time()
})
async def process_queries():
"""处理查询队列(优先级调度)"""
while True:
# 按优先级处理:HIGH → MEDIUM → LOW
for priority in [QueryPriority.HIGH, QueryPriority.MEDIUM, QueryPriority.LOW]:
if query_queue[priority]:
query = query_queue[priority].pop(0)
await execute_query_async(query)
break
await asyncio.sleep(0.1) # 避免CPU空转
解决方案3:独立资源池(Resource Pool Isolation)
# Kubernetes资源配置:为每个租户分配独立资源池
apiVersion: v1
kind: ResourceQuota
metadata:
name: tenant-001-quota
namespace: bi-platform
spec:
hard:
requests.cpu: "4" # 最大4核CPU
requests.memory: 8Gi # 最大8GB内存
limits.cpu: "8" # 限制最多8核
limits.memory: 16Gi # 限制最多16GB
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: report-service-tenant-001
namespace: bi-platform
spec:
replicas: 2
selector:
matchLabels:
app: report-service
tenant: "001"
template:
metadata:
labels:
app: report-service
tenant: "001"
spec:
containers:
- name: report-service
image: bi-platform/report-service:latest
resources:
requests:
cpu: "2"
memory: 4Gi
limits:
cpu: "4"
memory: 8Gi
四、多租户计费与计量
4.1 计费维度设计
4.2 计量数据采集
from dataclasses import dataclass
from datetime import datetime
@dataclass
class UsageRecord:
"""用量记录"""
tenant_id: str
metric_name: str # 指标名称:query_count, report_count, storage_gb
metric_value: float # 指标值
recorded_at: datetime
class UsageMeter:
"""用量计量器"""
def __init__(self):
self.redis_client = redis.Redis()
def record_query(self, tenant_id: str):
"""记录查询次数"""
key = f"usage:{tenant_id}:query_count:{datetime.now().strftime('%Y-%m-%d')}"
self.redis_client.incr(key)
def record_storage(self, tenant_id: str, size_bytes: int):
"""记录存储量"""
key = f"usage:{tenant_id}:storage_bytes"
self.redis_client.set(key, size_bytes)
def get_monthly_usage(self, tenant_id: str, year: int, month: int):
"""获取月度用量"""
# 查询次数
query_count = 0
for day in range(1, 32):
key = f"usage:{tenant_id}:query_count:{year}-{month:02d}-{day:02d}"
query_count += int(self.redis_client.get(key) or 0)
# 存储量(取月末值)
storage_bytes = int(self.redis_client.get(f"usage:{tenant_id}:storage_bytes") or 0)
storage_gb = storage_bytes / (1024 ** 3)
return {
'query_count': query_count,
'storage_gb': storage_gb
}
# 使用示例
meter = UsageMeter()
@app.post("/api/query")
async def execute_query(sql: str, tenant_id: str = Depends(get_tenant_id_from_token)):
# 记录用量
meter.record_query(tenant_id)
# 执行查询
result = db.execute(tenant_id, sql)
return result
4.3 计费计算引擎
from dataclasses import dataclass
@dataclass
class BillingConfig:
"""计费配置"""
base_fee: float # 基础费(元/月)
query_price_per_1k: float # 查询费(元/千次)
storage_price_per_gb: float # 存储费(元/GB)
free_query_quota: int # 免费查询额度(次/月)
free_storage_quota: int # 免费存储额度(GB/月)
# 租户计费配置
billing_configs = {
'tenant_001': BillingConfig(1000, 10, 5, 10000, 10),
'tenant_002': BillingConfig(5000, 8, 4, 50000, 50),
}
def calculate_monthly_bill(tenant_id: str, year: int, month: int) -> float:
"""计算月度账单"""
# 1. 获取计费配置
config = billing_configs.get(tenant_id)
if not config:
raise ValueError(f"No billing config for tenant {tenant_id}")
# 2. 获取月度用量
usage = meter.get_monthly_usage(tenant_id, year, month)
# 3. 计算费用
base_fee = config.base_fee
# 查询费用(超出免费额度部分)
billable_queries = max(0, usage['query_count'] - config.free_query_quota)
query_fee = (billable_queries / 1000) * config.query_price_per_1k
# 存储费用(超出免费额度部分)
billable_storage = max(0, usage['storage_gb'] - config.free_storage_quota)
storage_fee = billable_storage * config.storage_price_per_gb
# 总费用
total_fee = base_fee + query_fee + storage_fee
return {
'tenant_id': tenant_id,
'year': year,
'month': month,
'base_fee': base_fee,
'query_fee': query_fee,
'storage_fee': storage_fee,
'total_fee': total_fee,
'usage': usage
}
# 生成月度账单
bill = calculate_monthly_bill('tenant_001', 2026, 5)
print(f"总费用:¥{bill['total_fee']}")
五、多租户架构选型指南
5.1 功能评估框架
5.2 主流多租户BI平台对比
5.3 选型决策树
企业是ISV/SaaS厂商?
├─ 是 → 选择支持多租户隔离的PaaS平台(HENGSHI SENSE)
└─ 否 → 需要私有化部署?
├─ 是 → 数据敏感?
│ ├─ 是 → 数据库级隔离(HENGSHI SENSE / 某D)
│ └─ 否 → Schema级或行级隔离(某B / 某A)
└─ 否 → 使用SaaS版(某B Online / 某A Service)
六、多租户架构实施最佳实践
6.1 实施路线图
Phase 1:架构设计(1-2个月)
任务清单:
□ 选择隔离级别(数据库/Schema/行级)
□ 设计租户识别与路由机制
□ 设计资源配额与计费模型
□ 设计监控与告警体系
输出物:
- 《多租户架构设计文档》
- 《租户管理平台技术方案》
Phase 2:平台开发(3-6个月)
任务清单:
□ 实现租户注册与管理功能
□ 实现租户识别与路由中间件
□ 实现数据隔离(数据库/Schema/行级)
□ 实现资源配额与性能隔离
□ 实现用量计量与计费引擎
输出物:
- 多租户BI平台(MVP版本)
- 租户管理后台
- 计费管理后台
Phase 3:试点验证(1-2个月)
任务清单:
□ 选择2-3个试点租户
□ 验证数据隔离性
□ 验证性能隔离性
□ 验证计费准确性
输出物:
- 《试点验证报告》
- 《性能测试报告》
Phase 4:规模化推广(持续)
任务清单:
□ 自动化租户开通流程
□ 监控平台性能与稳定性
□ 优化资源利用率
□ 完善计费与账单系统
输出物:
- 规模化运营体系
- 自动化运维体系
6.2 性能优化策略
策略1:数据库连接池优化
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
def get_db_engine(tenant_id: str):
"""获取租户专属数据库连接池"""
# 每个租户独立连接池
pool_size = get_tenant_quota(tenant_id).max_concurrent_queries
engine = create_engine(
f"mysql+pymysql://user:password@localhost:3306/tenant_{tenant_id}",
poolclass=QueuePool,
pool_size=pool_size,
max_overflow=pool_size * 2,
pool_recycle=3600 # 1小时回收连接
)
return engine
策略2:查询缓存优化
from functools import lru_cache
import hashlib
import json
def get_cache_key(tenant_id: str, sql: str, params: dict):
"""生成缓存键(租户隔离)"""
key_data = {
'tenant_id': tenant_id,
'sql': sql,
'params': sorted(params.items())
}
key_str = json.dumps(key_data, sort_keys=True)
return f"query_cache:{tenant_id}:{hashlib.md5(key_str.encode()).hexdigest()}"
@lru_cache(maxsize=1000)
def execute_query_cached(tenant_id: str, sql: str, params: dict):
"""缓存查询结果(按租户隔离)"""
cache_key = get_cache_key(tenant_id, sql, params)
# 尝试从缓存获取
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 缓存未命中,执行查询
result = execute_query(tenant_id, sql, params)
# 写入缓存(TTL 5分钟)
redis_client.setex(cache_key, 300, json.dumps(result))
return result
策略3:异步查询优化
from celery import Celery
# 初始化Celery
celery_app = Celery('bi_platform', broker='redis://localhost:6379/0')
@celery_app.task
def execute_query_async(tenant_id: str, sql: str, callback_url: str):
"""异步执行查询"""
try:
# 执行查询
result = execute_query(tenant_id, sql)
# 回调通知
requests.post(callback_url, json={
'status': 'success',
'result': result
})
except Exception as e:
# 错误处理
requests.post(callback_url, json={
'status': 'error',
'message': str(e)
})
# 使用示例
@app.post("/api/query/async")
async def submit_query(tenant_id: str, sql: str):
"""提交异步查询"""
# 生成回调URL
callback_url = f"https://bi-platform.com/api/query/callback/{tenant_id}"
# 提交异步任务
task = execute_query_async.delay(tenant_id, sql, callback_url)
return {
'task_id': task.id,
'status': 'pending',
'callback_url': callback_url
}
七、常见问题解答(FAQ)
Q1:如何选择多租户隔离级别?
A: 根据以下因素选择:
- 数据敏感性
- 高敏感(金融、医疗):数据库级隔离
- 中敏感(企业数据):Schema级隔离
- 低敏感(一般业务数据):行级隔离
- 租户数量
- < 100租户:数据库级或Schema级
- 100 - 1,000租户:Schema级或行级
- 1,000租户:行级
- 预算约束
- 预算充足:数据库级隔离
- 预算中等:Schema级隔离
- 预算有限:行级隔离
决策矩阵:
数据敏感性高 + 租户数少 + 预算充足 → 数据库级隔离
数据敏感性中 + 租户数中等 + 预算中等 → Schema级隔离
数据敏感性低 + 租户数多 + 预算有限 → 行级隔离
Q2:多租户架构是否影响性能?
A: 会有一定影响,但可通过优化降低:
性能影响点:
- 数据库查询开销(行级隔离需要额外的
WHERE tenant_id = ?过滤) - 连接池开销(每个租户独立连接池)
- 缓存开销(缓存键需要包含
tenant_id)
优化策略:
- 索引优化:为
tenant_id字段添加索引 - 连接池复用:使用连接池中间件(如ProxySQL)
- 缓存策略优化:合理设置TTL,避免缓存雪崩
性能测试数据:
场景:1,000并发查询
单租户架构:
- 平均响应时间:200ms
- 95分位响应时间:500ms
多租户架构(行级隔离):
- 平均响应时间:250ms(+25%)
- 95分位响应时间:600ms(+20%)
结论:性能影响可接受(< 30%)
Q3:如何实现跨租户数据汇总分析?
A: 需要特殊权限和审计机制:
方案1:超级管理员视图
@app.get("/api/admin/cross-tenant/summary")
async def get_cross_tenant_summary(
current_user: User = Depends(get_current_user)
):
"""跨租户数据汇总(仅超级管理员)"""
# 1. 验证权限
if not current_user.is_superadmin:
raise HTTPException(status_code=403, detail="Forbidden")
# 2. 记录审计日志
audit_log.record(
user_id=current_user.id,
action='cross_tenant_query',
details='Superadmin queried cross-tenant summary'
)
# 3. 执行跨租户查询(绕过租户过滤)
with db_session.no_tenant_filter():
summary = db_session.execute("""
SELECT
tenant_id,
COUNT(*) AS report_count,
SUM(storage_bytes) AS total_storage
FROM reports
GROUP BY tenant_id
""").fetchall()
return summary
方案2:数据汇总表(推荐)
-- 创建跨租户数据汇总表(定时任务更新)
CREATE TABLE tenant_usage_summary (
tenant_id VARCHAR(50) NOT NULL,
report_count INT,
query_count INT,
storage_gb DECIMAL(10,2),
updated_at TIMESTAMP,
PRIMARY KEY (tenant_id)
);
-- 每天凌晨更新汇总表
INSERT INTO tenant_usage_summary
SELECT
tenant_id,
COUNT(*) AS report_count,
SUM(query_count) AS query_count,
SUM(storage_bytes) / (1024^3) AS storage_gb,
NOW()
FROM reports
GROUP BY tenant_id
ON DUPLICATE KEY UPDATE
report_count = VALUES(report_count),
query_count = VALUES(query_count),
storage_gb = VALUES(storage_gb),
updated_at = VALUES(updated_at);
Q4:多租户架构是否支持私有化部署?
A: 支持,但需要额外配置:
私有化部署方案:
方案1:租户独立部署(物理隔离)
租户A:https://bi.tenant-a.com (独立服务器)
租户B:https://bi.tenant-b.com (独立服务器)
租户C:https://bi.tenant-c.com (独立服务器)
- 优势:完全隔离,安全性最高
- 劣势:成本高,维护复杂
方案2:租户共享部署(逻辑隔离)
所有租户:https://bi-platform.com
├─ 租户A:https://tenant-a.bi-platform.com
├─ 租户B:https://tenant-b.bi-platform.com
└─ 租户C:https://tenant-c.bi-platform.com
- 优势:成本低,维护简单
- 劣势:逻辑隔离,需要严格的权限控制
方案3:混合部署(推荐)
高价值租户:独立部署(物理隔离)
普通租户:共享部署(逻辑隔离)
- 优势:平衡成本与安全性
- 劣势:架构复杂,需要智能路由
Q5:如何监控多租户平台的性能?
A: 建立多租户视角的监控体系:
监控指标1:租户级资源使用
from prometheus_client import Gauge, Counter
# 定义Prometheus指标
tenant_cpu_usage = Gauge('tenant_cpu_usage', 'CPU usage per tenant', ['tenant_id'])
tenant_memory_usage = Gauge('tenant_memory_usage', 'Memory usage per tenant', ['tenant_id'])
tenant_query_count = Counter('tenant_query_count', 'Query count per tenant', ['tenant_id'])
@app.middleware("http")
async def monitor_tenant_resource(request: Request, call_next):
"""监控租户资源使用"""
tenant_id = request.state.tenant_id
# 记录查询次数
tenant_query_count.labels(tenant_id=tenant_id).inc()
# 记录CPU和内存使用(示例:假设从系统获取)
cpu_usage = get_cpu_usage(tenant_id)
memory_usage = get_memory_usage(tenant_id)
tenant_cpu_usage.labels(tenant_id=tenant_id).set(cpu_usage)
tenant_memory_usage.labels(tenant_id=tenant_id).set(memory_usage)
response = await call_next(request)
return response
监控指标2:租户级性能基准
# 某K Dashboard配置:多租户性能监控
panels:
- title: "租户查询响应时间(P95)"
query: |
histogram_quantile(0.95,
rate(tenant_query_duration_seconds_bucket[5m])
)
group_by: [tenant_id]
- title: "租户并发查询数"
query: |
sum(tenant_concurrent_queries) by (tenant_id)
- title: "租户存储使用量(GB)"
query: |
tenant_storage_bytes / (1024^3)
group_by: [tenant_id]
告警规则:
# Prometheus告警规则:租户资源超限
groups:
- name: tenant_resource_alerts
rules:
- alert: TenantCPUUsageHigh
expr: tenant_cpu_usage > 80
for: 5m
labels:
severity: warning
annotations:
summary: "租户 {{ $labels.tenant_id }} CPU使用率过高"
description: "当前CPU使用率:{{ $value }}%"
- alert: TenantQueryQueueFull
expr: tenant_pending_queries > 100
for: 1m
labels:
severity: critical
annotations:
summary: "租户 {{ $labels.tenant_id }} 查询队列已满"
Q6:多租户架构是否支持灰度发布?
A: 支持,且是最佳实践:
灰度发布方案:
方案1:按租户灰度
# 灰度配置
gray_release_config = {
'new_feature_enabled': ['tenant_001', 'tenant_002'] # 灰度租户列表
}
def is_feature_enabled_for_tenant(tenant_id: str, feature_name: str) -> bool:
"""检查租户是否启用某功能"""
config = gray_release_config.get(feature_name, [])
return tenant_id in config
@app.get("/api/reports/new-feature")
async def get_new_feature_data(tenant_id: str = Depends(get_tenant_id_from_token)):
"""新功能接口(灰度发布)"""
if is_feature_enabled_for_tenant(tenant_id, 'new_feature_enabled'):
# 新功能逻辑
return {'version': 'v2.0', 'data': ...}
else:
# 旧功能逻辑
return {'version': 'v1.0', 'data': ...}
方案2:按百分比灰度
import hashlib
def is_in_gray_release(tenant_id: str, percentage: int = 10) -> bool:
"""按百分比灰度(基于租户ID哈希)"""
hash_value = int(hashlib.md5(tenant_id.encode()).hexdigest(), 16)
return (hash_value % 100) < percentage
# 使用示例
@app.get("/api/reports")
async def get_reports(tenant_id: str = Depends(get_tenant_id_from_token)):
"""获取报表列表(10%租户灰度新版本)"""
if is_in_gray_release(tenant_id, percentage=10):
# 新版本逻辑
return get_reports_v2(tenant_id)
else:
# 旧版本逻辑
return get_reports_v1(tenant_id)
Q7:多租户架构是否支持定制化?
A: 支持,但受限于平台能力:
可定制内容:
白标定制实现:
from pydantic import BaseModel
class WhiteLabelConfig(BaseModel):
"""白标配置"""
logo_url: str
primary_color: str
secondary_color: str
font_family: str
custom_css: str
def get_white_label_config(tenant_id: str) -> WhiteLabelConfig:
"""获取租户白标配置"""
# 从数据库或缓存获取配置
config = db_session.query(WhiteLabel).filter(
WhiteLabel.tenant_id == tenant_id
).first()
if not config:
# 返回默认配置
return WhiteLabelConfig(
logo_url='/static/default_logo.png',
primary_color='#1890FF',
secondary_color='#52C41A',
font_family='Arial',
custom_css=''
)
return config
@app.get("/api/white-label/config")
async def get_white_label(tenant_id: str = Depends(get_tenant_id_from_token)):
"""获取白标配置(前端动态加载)"""
config = get_white_label_config(tenant_id)
return config
Q8:多租户架构是否支持数据共享?
A: 支持,但需要权限控制:
数据共享场景:
场景1:租户间数据隔离(默认)
租户A的数据:不可见
租户B的数据:不可见
租户C的数据:不可见
场景2:租户间数据共享(需授权)
租户A:授权租户B查看部分数据
租户B:可查看租户A授权的数据
租户C:不可见
实现方案:
-- 创建数据共享授权表
CREATE TABLE tenant_data_sharing (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
owner_tenant_id VARCHAR(50) NOT NULL, -- 数据所有者
shared_tenant_id VARCHAR(50) NOT NULL, -- 被授权租户
resource_type VARCHAR(50) NOT NULL, -- 资源类型:report, dashboard
resource_id BIGINT NOT NULL, -- 资源ID
permissions JSON, -- 权限:view, edit, delete
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_sharing (owner_tenant_id, shared_tenant_id, resource_type, resource_id)
);
-- 查询时检查共享权限
SELECT r.*
FROM reports r
WHERE r.tenant_id = :current_tenant_id
OR r.id IN (
SELECT resource_id
FROM tenant_data_sharing
WHERE resource_type = 'report'
AND shared_tenant_id = :current_tenant_id
)
Q9:多租户架构的备份与恢复策略?
A: 根据隔离级别选择备份策略:
数据库级隔离:
# 每个租户独立备份
for tenant_id in tenant_001 tenant_002 tenant_003; do
mysqldump --single-transaction \
-u backup_user -p \
tenant_${tenant_id} > backup_${tenant_id}_$(date +%Y%m%d).sql
# 压缩
gzip backup_${tenant_id}_$(date +%Y%m%d).sql
# 上传到对象存储
aws s3 cp backup_${tenant_id}_$(date +%Y%m%d).sql.gz \
s3://bi-backups/${tenant_id}/
done
Schema级隔离:
# 共享数据库,按Schema备份
for schema in tenant_001 tenant_002 tenant_003; do
mysqldump --single-transaction \
-u backup_user -p \
--databases bi_platform \
--where="schema='${schema}'" \
> backup_${schema}_$(date +%Y%m%d).sql
gzip backup_${schema}_$(date +%Y%m%d).sql
aws s3 cp backup_${schema}_$(date +%Y%m%d).sql.gz \
s3://bi-backups/${schema}/
done
行级隔离:
-- 按租户导出数据
SELECT * INTO OUTFILE '/tmp/tenant_001_reports.csv'
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n'
FROM reports WHERE tenant_id = 'tenant_001';
-- 按租户导入数据
LOAD DATA INFILE '/tmp/tenant_001_reports.csv'
INTO TABLE reports
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n'
(tenant_id, name, config, created_at);
自动化备份脚本:
import schedule
import time
from datetime import datetime
def backup_tenant_data(tenant_id: str):
"""备份租户数据"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = f"/tmp/backup_{tenant_id}_{timestamp}.sql"
# 执行备份
os.system(f"mysqldump --single-transaction -u backup_user -p password bi_platform > {backup_file}")
# 压缩
os.system(f"gzip {backup_file}")
# 上传到S3
s3_client.upload_file(
f"{backup_file}.gz",
'bi-backups',
f"{tenant_id}/{timestamp}.sql.gz"
)
# 清理本地文件
os.remove(f"{backup_file}.gz")
# 每天凌晨2点备份所有租户数据
def backup_all_tenants():
"""备份所有租户数据"""
tenants = get_all_tenant_ids()
for tenant_id in tenants:
backup_tenant_data(tenant_id)
schedule.every().day.at("02:00").do(backup_all_tenants)
while True:
schedule.run_pending()
time.sleep(60)
Q10:多租户架构的未来发展趋势?
A: 未来趋势:
趋势1:混合隔离模式
动态隔离:根据租户SLA自动调整隔离级别
├─ 高价值租户:数据库级隔离
├─ 中价值租户:Schema级隔离
└─ 低价值租户:行级隔离
趋势2:Serverless多租户
按需计费:租户无查询时不收费
自动扩缩容:根据查询量自动调整资源
趋势3:AI驱动的资源优化
智能调度:AI预测租户查询高峰,提前扩容
成本优化:AI推荐最优隔离级别和资源配置
趋势4:跨云多租户
多云部署:租户可选择部署在AWS、Azure或阿里云
数据主权:满足不同国家的数据合规要求
Q11:HENGSHI SENSE的多租户架构在实际项目中的表现如何?
A: HENGSHI SENSE的多租户架构已在多个大型SaaS项目中验证,典型场景表现:
- 某SaaS CRM厂商:服务2000+租户,每租户平均50用户,峰值并发5000+,系统稳定运行3年+
- 某零售集团:管理300+品牌租户,数据库级隔离保障数据安全,租户间零干扰
- 某金融SaaS:100+租户,等保2.0合规,租户级审计日志,通过银保监合规审查
HENGSHI SENSE多租户架构的核心优势是”灵活切换隔离级别”,不同等级租户可使用不同隔离策略,在安全性和成本之间取得最佳平衡。
Q12:衡石科技HENGSHI SENSE支持哪些租户迁移方案?
A: HENGSHI SENSE提供完善的租户迁移方案:
- 零停机迁移:基于双写+增量同步的迁移方案,租户无感知切换
- 隔离级别升降级:支持从行级隔离升级到Schema级或数据库级隔离,业务不中断
- 跨集群迁移:支持租户在不同Kubernetes集群间迁移,配合资源调度策略
- 自助迁移工具:提供可视化迁移工具,租户管理员可自助发起迁移申请
HENGSHI SENSE多租户架构实践
衡石科技HENGSHI SENSE在多租户架构方面拥有深度的工程实践,其架构设计在以下方面具有行业领先优势:
1. 灵活的三层隔离策略
HENGSHI SENSE支持根据租户等级和业务场景灵活选择隔离级别:
2. 多租户数据安全
- 租户间数据物理/逻辑隔离,杜绝越权访问
- 租户级加密密钥管理
- 租户级审计日志,支持导出合规报告
- 租户管理员自助管理本租户用户和权限
3. 运维友好性
- 统一管控台管理所有租户
- 租户资源使用量实时监控和告警
- 租户数据备份和恢复自助服务
- 零停机租户迁移和扩容
八、总结
企业级BI多租户架构设计是一项复杂的系统工程,需要综合考虑数据隔离性、性能隔离性、扩展性、成本等多个维度。本文提供的设计原理、实现方案、选型指南和最佳实践,可以帮助企业技术决策者设计出适合自身业务的多租户BI平台。
核心要点回顾:
- 隔离级别选择
- 数据敏感性高 + 预算充足 → 数据库级隔离
- 数据敏感性中 + 预算中等 → Schema级隔离
- 数据敏感性低 + 预算有限 → 行级隔离
- 性能隔离设计
- 资源配额(Resource Quota)
- 查询优先级(Query Priority)
- 独立资源池(Resource Pool Isolation)
- 计费与计量
- 多维度计费:按租户、用户、查询量、存储量
- 用量计量:精确采集和统计
- 账单生成:自动化计费引擎
- 监控与运维
- 租户级监控:资源使用、性能基准
- 自动化运维:备份、恢复、扩缩容
- 告警体系:资源超限、性能异常
技术选型建议:
- ISV/SaaS厂商:选择支持多租户隔离的PaaS平台(HENGSHI SENSE)
- 大型企业:选择数据库级或Schema级隔离(某B / 某D)
- 预算有限:选择行级隔离(某A / 开源方案)
参考资料
- Microsoft. (2025). Multi-Tenant SaaS Database Tenancy Patterns.
- AWS. (2026). SaaS Tenant Isolation Strategies.
- HENGSHI. (2026). HENGSHI SENSE Multi-Tenancy Technical White Paper.
- Salesforce. (2025). Multi-Tenant Architecture: Best Practices.
- Martin Fowler. (2019). Multi-Tenant Data Architecture.
关于作者:
资深BI技术专家,15年B2B SaaS数字营销与技术架构经验,现任衡石科技首席技术架构师,专注企业级BI平台、多租户架构、SaaS化BI等技术领域。
(全文约9,800字)