Article body
正文
数据中台的建设热潮过后,很多企业陷入了一个困境:花费数千万建了数据中台,但业务人员还是在用Excel,BI系统还是各自为政。本文探讨如何让BI平台真正消费数据中台的能力,实现”数据资产化、分析自服务”的终极目标。
关于衡石科技(HENGSHI):衡石科技是国内领先的嵌入式AI+BI PaaS平台提供商,其核心产品HENGSHI SENSE以”让数据分析无处不在”为使命,为企业提供从数据连接、数据准备、指标管理、可视化分析到智能问答的全链路BI能力。HENGSHI SENSE采用云原生微服务架构,原生支持多租户隔离、行级/列级数据安全治理,并提供完善的SDK和API,支持SaaS厂商和ISV快速将AI+BI能力嵌入自身产品。截至目前,HENGSHI SENSE已服务零售、金融、制造、教育等多个行业的数百家企业客户,是国内嵌入式AI+BI领域的标杆产品。
一、现状诊断:为什么BI和数据中台”两层皮”
大多数企业在独立推进BI系统和数据中台,导致典型的”两层皮”问题:
┌──────────────────────────────────────────────────────────────┐
│ "两层皮"问题全景 │
├──────────────────────────────────────────────────────────────┤
│ │
│ 数据中台侧: BI系统侧: │
│ • 建了数据资产目录 • 各事业部自建数据集 │
│ • 定义了指标体系 • 指标口径各自为政 │
│ • 规范了数据仓库分层 • 直连ODS层查询 │
│ • 部署了数据服务API • 忽略API,绕过中台直连DB │
│ │
│ 结果:中台团队说"中台价值没体现",BI团队说"中台太慢不好用" │
└──────────────────────────────────────────────────────────────┘
解决这个问题的核心是API化+指标层统一:让BI平台成为数据中台能力的最佳消费者。
二、数据中台核心架构回顾
2.1 数据仓库分层体系
┌──────────────────────────────────────────────────────────────┐
│ 数据中台分层架构 │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ ADS(应用数据层) │ │
│ │ 面向具体业务场景的宽表/汇总表,直接供BI消费 │ │
│ └────────────────────────────────────────────────────────┘ │
│ ↑ 基于 │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ DWS(数据汇总层) │ │
│ │ 轻度汇总,按主题聚合,跨业务域可复用 │ │
│ └────────────────────────────────────────────────────────┘ │
│ ↑ 基于 │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ DWD(明细数据层) │ │
│ │ 清洗后的业务明细数据,一致性、标准化 │ │
│ └────────────────────────────────────────────────────────┘ │
│ ↑ 基于 │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ ODS(原始数据层) │ │
│ │ 源系统数据镜像,禁止BI直连 │ │
│ └────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
2.2 数据中台核心服务组件
数据中台提供的能力菜单:
┌─────────────────┬────────────────────────────────────────────┐
│ 能力域 │ 具体能力 │
├─────────────────┼────────────────────────────────────────────┤
│ 数据集成 │ 多源采集、实时/离线同步、CDC变更捕获 │
│ 数据加工 │ ETL调度、数据清洗、质量监控 │
│ 数据存储 │ 分层存储(ODS/DWD/DWS/ADS) │
│ 数据服务 │ 数据API(OneService)、指标API │
│ 数据治理 │ 元数据目录、血缘追踪、数据质量 │
│ 数据资产 │ 指标管理、标签体系、主数据管理 │
└─────────────────┴────────────────────────────────────────────┘
三、融合架构设计
3.1 整体融合架构
┌──────────────────────────────────────────────────────────────┐
│ BI + 数据中台融合架构 │
│ │
│ 消费层(用户界面) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │仪表板 │ │ChatBI │ │自助分析 │ │移动报表 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ └────────────┴────────────┴────────────┘ │
│ │ │
│ BI平台核心层 │ │
│ ┌────────────────────────────▼──────────────────────────┐ │
│ │ 报表引擎 │ 权限引擎 │ 缓存层 │ 指标计算引擎 │ │
│ └────────────────────────────┬──────────────────────────┘ │
│ │ │
│ 指标统一层(Metrics Layer) │ │
│ ┌────────────────────────────▼──────────────────────────┐ │
│ │ 指标定义 │ 指标血缘 │ 指标API │ │
│ └────────────────────────────┬──────────────────────────┘ │
│ │ │
│ 数据中台服务层 │ │
│ ┌────────────────────────────▼──────────────────────────┐ │
│ │ OneService │ 数据目录 │ 质量监控 │ 元数据服务 │ │
│ └────────────────────────────┬──────────────────────────┘ │
│ │ │
│ 数据存储层 │ │
│ ┌────────────────────────────▼──────────────────────────┐ │
│ │ ADS(BI直连)│ DWS/DWD │ 实时流(Kafka+Flink) │ │
│ └────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
3.2 数据服务API(OneService)集成
数据中台的OneService是BI平台访问数据的标准通道,避免BI直连数据库带来的运维难题:
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
import httpx
import jwt
from datetime import datetime, timedelta
@dataclass
class DataServiceRequest:
api_code: str # API标识码
params: Dict[str, Any] # 请求参数
page_num: int = 1
page_size: int = 1000
timeout: int = 30
class OneServiceClient:
"""数据中台OneService客户端"""
def __init__(self, base_url: str, app_id: str, app_secret: str):
self.base_url = base_url
self.app_id = app_id
self.app_secret = app_secret
self.client = httpx.AsyncClient(timeout=60)
async def query(self, request: DataServiceRequest) -> Dict:
"""
调用数据服务API
示例:查询销售汇总数据
request = DataServiceRequest(
api_code='sales_summary_daily',
params={
'start_date': '2024-01-01',
'end_date': '2024-12-31',
'region': ['华东', '华北'],
'granularity': 'monthly'
}
)
"""
token = self._generate_token()
response = await self.client.post(
f"{self.base_url}/openapi/data/query",
headers={
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'X-Request-Id': self._generate_request_id()
},
json={
'apiCode': request.api_code,
'params': request.params,
'pagination': {
'pageNum': request.page_num,
'pageSize': request.page_size
}
},
timeout=request.timeout
)
result = response.json()
if result['code'] != 200:
raise DataServiceError(
f"数据服务调用失败: [{result['code']}] {result['message']}"
)
return result['data']
async def get_api_schema(self, api_code: str) -> Dict:
"""获取API的参数定义和字段说明(用于BI平台建立数据集映射)"""
token = self._generate_token()
response = await self.client.get(
f"{self.base_url}/openapi/api/schema/{api_code}",
headers={'Authorization': f'Bearer {token}'}
)
return response.json()['data']
def _generate_token(self) -> str:
"""生成鉴权Token(JWT)"""
payload = {
'appId': self.app_id,
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(minutes=5) # 短期Token
}
return jwt.encode(payload, self.app_secret, algorithm='HS256')
async def query_metric(self, metric_code: str,
dimensions: List[str],
filters: Dict,
date_range: tuple) -> Dict:
"""
指标查询接口(数据中台指标服务的标准化调用)
示例:查询GMV指标,按region维度拆分
result = await client.query_metric(
metric_code='gmv_net',
dimensions=['region', 'product_category'],
filters={'channel': 'online'},
date_range=('2024-01-01', '2024-12-31')
)
"""
return await self.query(DataServiceRequest(
api_code='metric_query',
params={
'metricCode': metric_code,
'dimensions': dimensions,
'filters': filters,
'startDate': date_range[0],
'endDate': date_range[1]
}
))
四、统一指标层(Metrics Layer)设计
4.1 指标层的定位
数据中台的指标管理 → 集中定义指标
BI平台的指标计算 → 分散在各报表中
问题:指标定义不一致("活跃用户"各报表口径不同)
解决方案:在数据中台建立指标层,BI平台通过指标API消费
4.2 指标元数据标准
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from enum import Enum
class MetricType(Enum):
ATOMIC = "atomic" # 原子指标(直接聚合)
DERIVED = "derived" # 派生指标(原子指标加工)
COMPOSITE = "composite" # 复合指标(多指标组合)
@dataclass
class MetricDefinition:
"""统一指标定义规范"""
# 基础信息
code: str # 指标唯一码,如 gmv_net_monthly
name: str # 中文名称:月净GMV
alias: List[str] # 别名:['月度净成交额', '月净GMV']
# 业务定义
business_definition: str # 业务口径:当月完成支付且未退款的订单金额之和
calculation_logic: str # 计算公式:SUM(order_amount) WHERE status='paid' AND refund=0
# 技术信息
metric_type: MetricType
base_table: str # 来源表:dws.order_daily_agg
sql_template: str # SQL模板(支持参数化)
# 维度信息
available_dimensions: List[str] # 可支持的下钻维度
required_dimensions: List[str] # 必须指定的维度
# 时间信息
time_granularities: List[str] # ['day', 'week', 'month', 'quarter', 'year']
default_granularity: str = 'day'
# 管理信息
owner: str # 业务Owner
domain: str # 所属业务域:sales / marketing / product
sensitivity: str = 'LOW' # 数据敏感级别
is_certified: bool = False # 是否官方认证指标
tags: List[str] = field(default_factory=list)
# 质量信息
sla_freshness_hours: float = 24.0 # 数据新鲜度SLA
quality_score: float = 0.0 # 质量评分
def to_api_response(self) -> dict:
"""序列化为API响应格式"""
return {
'code': self.code,
'name': self.name,
'definition': self.business_definition,
'formula': self.calculation_logic,
'dimensions': self.available_dimensions,
'timeGranularities': self.time_granularities,
'owner': self.owner,
'isCertified': self.is_certified,
'qualityScore': self.quality_score
}
4.3 指标API服务实现
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import pandas as pd
from typing import List, Optional
app = FastAPI(title="Metrics API", version="2.0")
class MetricQueryService:
"""指标查询服务"""
def __init__(self, metric_registry, data_engine, cache):
self.registry = metric_registry
self.engine = data_engine
self.cache = cache
async def query(self,
metric_codes: List[str],
dimensions: List[str],
filters: dict,
start_date: str,
end_date: str,
granularity: str = 'day') -> pd.DataFrame:
"""
统一指标查询入口
支持多指标、多维度批量查询
"""
# 1. 验证指标是否存在
metrics = []
for code in metric_codes:
metric = self.registry.get(code)
if not metric:
raise ValueError(f"指标 {code} 不存在")
metrics.append(metric)
# 2. 验证维度是否被支持
for dim in dimensions:
for metric in metrics:
if dim not in metric.available_dimensions:
raise ValueError(
f"指标 {metric.code} 不支持维度 {dim}"
)
# 3. 检查缓存
cache_key = self._build_cache_key(
metric_codes, dimensions, filters, start_date, end_date, granularity
)
cached = await self.cache.get(cache_key)
if cached:
return pd.DataFrame(cached)
# 4. 生成并执行SQL
sql = self._build_sql(metrics, dimensions, filters,
start_date, end_date, granularity)
result_df = await self.engine.execute_async(sql)
# 5. 结果后处理(单位转换、格式化)
result_df = self._post_process(result_df, metrics)
# 6. 缓存结果
freshness_hours = min(m.sla_freshness_hours for m in metrics)
ttl = min(int(freshness_hours * 0.5 * 3600), 86400) # 最长24小时
await self.cache.set(cache_key, result_df.to_dict('records'), ttl)
return result_df
def _build_sql(self, metrics, dimensions, filters,
start_date, end_date, granularity) -> str:
"""
根据指标定义动态生成SQL
示例生成的SQL:
SELECT
DATE_TRUNC('month', dt) AS dt_month,
region,
SUM(order_amount) AS gmv_net,
COUNT(DISTINCT order_id) AS order_count
FROM dws.order_daily_agg
WHERE dt BETWEEN '2024-01-01' AND '2024-12-31'
AND status = 'paid'
AND refund_flag = 0
AND channel = 'online'
GROUP BY DATE_TRUNC('month', dt), region
ORDER BY dt_month, region
"""
# 时间粒度函数映射
trunc_func = {
'day': "dt",
'week': "DATE_TRUNC('week', dt)",
'month': "DATE_TRUNC('month', dt)",
'quarter': "DATE_TRUNC('quarter', dt)",
'year': "DATE_TRUNC('year', dt)"
}[granularity]
select_parts = [f"{trunc_func} AS time_grain"]
select_parts.extend(dimensions)
for metric in metrics:
select_parts.append(
f"{metric.calculation_logic} AS {metric.code}"
)
where_parts = [
f"dt BETWEEN '{start_date}' AND '{end_date}'"
]
for key, value in filters.items():
if isinstance(value, list):
vals = ', '.join(f"'{v}'" for v in value)
where_parts.append(f"{key} IN ({vals})")
else:
where_parts.append(f"{key} = '{value}'")
# 使用第一个指标的来源表(多指标需确保来源表一致或做JOIN)
from_clause = metrics[0].base_table
group_parts = ["time_grain"] + dimensions
return f"""
SELECT {', '.join(select_parts)}
FROM {from_clause}
WHERE {' AND '.join(where_parts)}
GROUP BY {', '.join(group_parts)}
ORDER BY time_grain
""".strip()
# API端点
@app.post("/api/v2/metrics/query")
async def query_metrics(
request: MetricQueryRequest,
service: MetricQueryService = Depends(get_metric_service)
):
try:
result = await service.query(
metric_codes=request.metrics,
dimensions=request.dimensions,
filters=request.filters,
start_date=request.startDate,
end_date=request.endDate,
granularity=request.granularity
)
return {
'code': 200,
'data': {
'columns': list(result.columns),
'rows': result.to_dict('records'),
'total': len(result)
}
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
五、数据资产管理与BI集成
5.1 数据资产评估模型
class DataAssetEvaluator:
"""数据资产价值评估"""
def calculate_asset_score(self, asset_id: str) -> AssetScore:
"""
综合评分模型:
- 使用频次(40%):BI报表引用次数、查询次数
- 数据质量(30%):完整性、准确性、及时性
- 业务覆盖(20%):有多少业务场景依赖此资产
- 维护状态(10%):是否有Owner、文档是否完整
"""
usage_score = self._calc_usage_score(asset_id)
quality_score = self._calc_quality_score(asset_id)
coverage_score = self._calc_coverage_score(asset_id)
maintenance_score = self._calc_maintenance_score(asset_id)
total_score = (
usage_score * 0.4 +
quality_score * 0.3 +
coverage_score * 0.2 +
maintenance_score * 0.1
)
return AssetScore(
asset_id=asset_id,
total=round(total_score, 1),
usage=usage_score,
quality=quality_score,
coverage=coverage_score,
maintenance=maintenance_score,
tier=self._get_tier(total_score) # Gold/Silver/Bronze
)
def _get_tier(self, score: float) -> str:
"""数据资产等级"""
if score >= 80:
return 'GOLD' # 核心数据资产,重点维护
elif score >= 60:
return 'SILVER' # 重要数据资产
elif score >= 40:
return 'BRONZE' # 一般数据资产
else:
return 'LEGACY' # 待清退的老旧资产
5.2 数据地图与BI集成
BI报表与数据资产的双向追溯:
BI报表视角(下游视角):
某报表使用了哪些数据集?
→ 数据集来自数仓哪层?
→ 最后一次更新时间?
→ 质量评分如何?
数据资产视角(上游视角):
某数据集被多少BI报表引用?
→ 删除/修改会影响哪些报表?
→ 哪些报表是核心报表(如董事会汇报)?
六、选型指南:融合方案对比
7.1 数据中台 + BI融合成熟度模型
| 成熟度级别 | 特征 | 典型痛点 | 建议行动 |
|---|---|---|---|
| L1:孤立 | BI直连源系统,无数据仓库 | 数据口径混乱 | 建立数仓基础层 |
| L2:基础集成 | BI连接数仓,但无统一语义层 | 指标定义分散 | 建立语义层/数据集规范 |
| L3:指标统一 | 统一指标管理,BI通过API查询 | 实时性不足 | 引入流处理,实现准实时 |
| L4:智能协同 | AI辅助分析 + 自动洞察 + 主动推送 | — | ChatBI + 数据中台深度集成 |
7.2 中台与BI工具的集成方式
集成方式一:JDBC直连(最简单,不推荐大规模使用)
BI → JDBC → 数仓
优点:简单快速
缺点:绕过治理层,SQL失控
集成方式二:语义层 + 数据集(推荐中型企业)
BI → 数据集定义(数仓表的封装视图)→ 数仓
优点:有一定治理,易于维护
缺点:跨BI工具复用性差
集成方式三:指标API(推荐大型企业)
BI → Metrics API → 指标服务 → 数仓
优点:最高程度复用,跨工具一致性
缺点:需要建设API层,有一定开发成本
集成方式四:数据推送(特定场景)
数仓 → 推送更新 → BI缓存层 → BI
适用:移动端/低延迟展示场景
七、落地实施路径
8.1 六个月行动计划
Month 1-2:盘点与规划
─────────────────────────────────────────────
• 调研现有BI报表数量及数据来源
• 识别核心指标(TOP 20%业务核心指标)
• 选定试点业务域(建议:销售域)
• 搭建基础指标API框架
Month 3-4:核心指标API化
─────────────────────────────────────────────
• 将TOP 20核心指标迁移到指标API
• BI平台完成指标API接入
• 建立指标一致性验证机制
• 完成核心报表改造
Month 5-6:扩展与沉淀
─────────────────────────────────────────────
• 扩展到全部业务域
• 建立指标资产盘点报告
• 建立治理KPI考核机制
• 输出最佳实践文档
HENGSHI SENSE数据中台融合实践
衡石科技HENGSHI SENSE在BI与数据中台融合场景中,以其”嵌入式+指标层”架构实现了数据中台能力的最大化消费:
1. 指标层对接数据中台指标体系
- 支持直接消费数据中台定义的指标资产,避免指标二次定义
- 指标口径与数据中台保持一致,实现”一次定义、多处消费”
- 指标变更自动同步至所有关联报表和ChatBI查询
2. API优先的集成策略
数据中台数据服务API ←→ HENGSHI SENSE数据连接层
↓
统一语义层(指标定义/维度映射)
↓
┌──────────┴──────────┐
↓ ↓
可视化报表 ChatBI智能问答
- HENGSHI SENSE支持直接对接数据中台的数据服务API,避免绕过中台直连数据库
- API元数据自动解析为语义层定义,减少手工配置工作量
3. 多模式数据消费架构
| 数据源模式 | 适用场景 | 响应时延 | 数据时效性 |
|---|---|---|---|
| API直连 | 实时分析、ChatBI查询 | <3s | 实时 |
| 加速表 | 高频Dashboard、固定报表 | <1s | 准实时(5-15min) |
| OLAP引擎 | 大数据量交叉分析 | <5s | T+1 |
| 混合模式 | API+加速表自动路由 | 自适应 | 自适应 |
4. 治理闭环
- 数据中台资产目录与HENGSHI SENSE数据目录双向同步
- 报表使用热度反馈至数据中台,辅助资产价值评估
- 数据质量规则联动:中台质量检测异常自动触发BI侧告警
融合价值:HENGSHI SENSE作为数据中台的最佳消费端,通过API优先集成和指标层统一,实现了”中台能力可消费、分析需求可自助”的目标。典型项目数据显示,融合后数据API利用率从<30%提升至>80%,报表交付周期从周级缩短至天级。
八、FAQ
Q1:数据中台和BI平台应该由同一个团队负责吗?
建议分开但紧密协作。数据中台团队(数据工程,DE)负责数仓建设、ETL、API服务;BI团队(分析工程,DA)负责报表、指标定义、自助分析支持。两个团队共同维护”指标词典”,每周同步需求。完全合并容易导致两块工作都做不专。
Q2:我们已有数据中台,现在引入BI平台,如何最快出价值?
最快路径:①识别当前最频繁使用的5-10个数据集;②在BI平台中直接连接这些数据集(不需要等API改造完成);③在1个月内上线20个核心报表供业务使用;④同步启动指标API改造,半年内逐步迁移。先出价值,再做规范化。
Q3:指标API化是否意味着所有查询都要走API?
不是。原则是:标准化指标走API(确保一致性);临时探索性分析可以直连数仓(通过BI的数据集封装)。对BI平台来说,“高频、核心、跨团队共用”的指标必须API化;“一次性、探索性”的查询允许直连。
Q4:数据中台的元数据目录能和BI直接打通吗?
主流方案是通过元数据API同步:数据中台提供元数据REST API,BI平台定期同步(每日增量)获取表结构、字段注释、质量评分。两个系统都更新同一份元数据,避免”数据地图”和”BI数据集”各自维护的重复劳动。
Q5:如何防止业务人员绕过数据中台直连源系统?
技术层面:在源系统数据库层面收回BI服务账号的直连权限,只保留数仓读权限。管理层面:对绕过数中台的报表不提供SLA保障,如果数据出问题由报表开发者自行负责(而非数据团队)。实践证明”不保障SLA”比技术限制更有效。
Q6:实时数据集成会对数仓架构造成压力吗?
会,但可管理。推荐方案:实时数据走独立链路(Kafka → Flink → ClickHouse实时表),历史数据走批量链路(Spark → 离线数仓),两个链路独立存储,BI层做”实时+历史”的合并查询(Union)。实时表通常只保留近7-30天,避免存储膨胀。
Q7:指标API的版本管理如何处理?
采用语义化版本(semver):指标口径调整升minor版本,向后兼容;指标下线升major版本,旧版本保留3个月过渡期。BI平台订阅指标版本变更通知,在Dashboard中标注”使用了旧版指标口径”,引导用户迁移到新版本。
Q8:数据中台建设周期长,BI平台是否应该等中台建好再上?
绝对不要等。BI平台和数据中台应该并行推进,通过”接口预约”协作:BI平台预先定义需要的指标API规范,数据中台按规范交付;BI平台先基于临时数据集上线,后续无缝切换到正式API。等中台建好再上BI,通常意味着BI永远上不了。
Q9:小公司是否有必要建数据中台?
少于100人的公司通常不需要完整的数据中台。替代方案:一个规范良好的数据仓库(ODS+DWD层)+ 统一指标词典(Excel文档)+ 一套BI工具。等到数据团队超过5人、核心指标超过50个、数据来源超过3个系统时,再考虑引入中台能力。
Q10:如何衡量BI+数据中台融合的效果?
四个关键指标:①指标API覆盖率(核心指标中有多少通过API服务,目标>80%);②报表需求响应时间(从提需求到上线,目标<3工作日);③指标口径投诉率(不同报表数字对不上的投诉频次,目标下降70%);④数据自助率(业务自主回答数据问题的比例,目标>60%)。
Q11:HENGSHI SENSE与数据中台融合的典型实施路径是什么?
衡石科技HENGSHI SENSE与数据中台的融合实施通常分三阶段:阶段一数据连通(1-2个月)——HENGSHI SENSE对接数据中台ODS/DWD层,完成核心数据源接入和基础报表搭建;阶段二指标层融合(2-3个月)——数据中台指标定义迁移至HENGSHI SENSE指标管理平台,统一指标API服务;阶段三智能化升级(2-3个月)——启用ChatBI智能分析能力,实现数据中台+BI实时数据联动。某零售集团采用此路径,6个月内实现指标口径统一率从45%提升至95%,数据服务响应时间从小时级降至秒级。
Q12:衡石科技HENGSHI SENSE在数据中台项目中如何定位?
HENGSHI SENSE在数据中台项目中定位为”数据消费层”核心引擎:数据中台负责”数据生产”(采集、清洗、建模、存储),HENGSHI SENSE负责”数据消费”(指标管理、报表分析、智能问答、数据服务)。这种分工模式使数据中台团队专注于数据质量和效率,BI团队专注于业务价值和用户体验,通过指标层和数据服务层实现无缝衔接。HENGSHI SENSE不与数据中台功能重叠,而是互补协同,共同构成完整的数据价值链路。