

这次用的数据来自一套部署在办公楼内的环境监测系统,5 个监测节点,采集以下指标:
采样频率 1Hz,每秒每个节点上报一条,一天约 43 万条,一个月约 1300 万条。
验证几个在 IoT 数据分析中常见的需求:
// 按日期 + 节点ID复合分区
db1 = database("", VALUE, 2024.01.01..2025.12.31)
db2 = database("", HASH, [SYMBOL, 5])
db = database("dfs://env_monitor", COMPO, [db1, db2])
sensor = table(1:0, `ts`nodeId`temperature`humidity`co2`pm25`lightLevel`,
[DATETIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE])
db.createPartitionedTable(sensor, "sensor", `ts`nodeId)日期用 VALUE 分区方便按天查询和清理历史数据;节点用 HASH 分区保证数据均匀分布。这种复合分区策略在 IoT 场景里很常见。
传感器原始数据是 CSV 格式,通过 Python API 导入:
import dolphindb as ddb
import pandas as pd
sess = ddb.Session()
sess.connect("localhost", 8848, "admin", "123456")
# 读取传感器CSV,解析时间列
df = pd.read_csv("env_sensor_20240615.csv", parse_dates=["ts"])
# 写入DolphinDB
sess.run("tableInsert{loadTable('dfs://env_monitor', 'sensor')}", df)
sess.close()实测感受:Python API 支持直接传 pandas DataFrame,大部分常见类型可以自动映射,少数无法完全匹配的类型仍需手动转换,之前用 InfluxDB 的 line protocol 写入需要手动拼接字符串格式。
传感器数据有一个绕不开的问题——网络抖动、设备重启、固件升级都会导致数据断点。在做分析之前需要先确认数据完整性。
// 找出某个节点数据中断超过5分钟的时刻
select ts, nodeId,
double(ts - prev(ts)) / 1000 as gapSeconds
from sensor
where nodeId = "N001" and date(ts) = 2024.06.15
context by nodeId
having double(ts - prev(ts)) / 1000 > 300context by 按节点分组后,prev(ts) 取上一条记录的时间戳,算出间隔。间隔超过 300 秒(5 分钟)的就是断点。
实测结果:在 1300 万条数据里查断点,响应在 1 秒以内。context by 的分组计算比 MySQL 的窗口函数写法简洁不少。
找到断点后,可以对短间隔的缺失做线性插值:
// 对温度和湿度做前值填充(缺失时间不超过2分钟的)
select ts, nodeId,
iif(isNull(temperature), ffill(temperature), temperature) as temperature,
iif(isNull(humidity), ffill(humidity), humidity) as humidity,
co2, pm25, lightLevel
from sensor
where nodeId = "N001" and date(ts) = 2024.06.15
context by nodeIdffill() 是前值填充(forward fill),和 pandas 的 df.fillna(method='ffill') 效果一样,区别在于 DolphinDB 的 SQL 可以自动走 map-reduce 计算——在分布式框架下数据存在多个分区,分区间 map 计算可以并行,省去了数据搬运的开销。
IoT 数据分析中,时间聚合是最常见的操作。秒级数据太细,通常需要聚合到分钟或小时级别才能看出趋势。
// 每个节点每分钟的平均值
select nodeId,
bar(ts, 60000) as minuteTs, // 按60秒对齐
avg(temperature) as avgTemp,
avg(humidity) as avgHumidity,
avg(co2) as avgCo2,
avg(pm25) as avgPm25,
max(pm25) as maxPm25
from sensor
where date(ts) = 2024.06.15
group by nodeId, bar(ts, 1m)DolphinDB的bar(ts, 1m)函数——把时间戳按 1 分钟对齐到最近的分钟边界。效果等同于 pandas 的 df.resample('1min'),但直接在 SQL 里完成。
实测:1300 万条数据做分钟聚合,DolphinDB 库内计算比"导出 CSV → pandas resample → 写回"快一个数量级。
// 一天内的温度小时趋势
select hour(ts) as hour,
avg(temperature) as avgTemp,
min(temperature) as minTemp,
max(temperature) as maxTemp,
max(temperature) - min(temperature) as tempRange
from sensor
where nodeId = "N001" and date(ts) = 2024.06.15
group by hour(ts)
order by hour查询解析:一天内温度波动有多大?哪些时段温差最大?
实测发现办公楼温度有明显的规律:早上 8 点空调开启后温度急降,中午 12-13 点午休空调调高,晚上 18 点后回升。这种"日内趋势"在小时聚合下一目了然。
// PM2.5的小时分位数分布
select hour(ts) as hour,
percentile(pm25, 25) as p25,
percentile(pm25, 50) as median,
percentile(pm25, 75) as p75,
percentile(pm25, 95) as p95,
percentile(pm25, 99) as p99
from sensor
where date(ts) = 2024.06.15
group by hour(ts)
order by hourpercentile() 直接算分位数,不需要导出数据再用 np.percentile。PM2.5 的 P95 和 P99 是评估空气质量的关键指标——平均值看着达标,但高峰时段可能超标。
// 找出温度超过30℃或CO2超过1000ppm的记录
select ts, nodeId, temperature, co2, pm25
from sensor
where date(ts) = 2024.06.15
and (temperature > 30 or co2 > 1000)
order by ts标准 SQL,没什么特别的。
单条记录超标可能是瞬时波动,持续超标才是真问题:
// 找出CO2连续超标超过10分钟的时段
select nodeId,
first(ts) as anomalyStart,
last(ts) as anomalyEnd,
count(*) as durationSeconds,
avg(co2) as avgCo2,
max(co2) as peakCo2
from (
select ts, nodeId, co2,
segment(ts, 1m) as seg // 间隔超过1分钟则断开
from sensor
where date(ts) = 2024.06.15 and co2 > 1000
context by nodeId
)
group by nodeId, segment(ts, 1m) as seg
having count(*) > 600 // 持续超过600秒(10分钟)
order by anomalyStart这个查询的思路:先用 segment() 把连续超标的记录分组(中间有间隔超过 1 分钟就断开),然后筛出持续时间超过 10 分钟的。
实测感受:segment() 在 IoT 场景里很方便。在 pandas 里做类似操作需要 diff().gt(threshold).cumsum() 三步组合,DolphinDB 一个函数就行。这种"连续状态识别"在传感器数据分析中非常常见——设备故障持续多久、温度超标持续多久、网络中断持续多久,都是同一个模式。
// 用3-sigma原则找出每个节点的温度离群点
select ts, nodeId, temperature,
avgTemp, stdTemp,
abs(temperature - avgTemp) / stdTemp as zscore
from (
select ts, nodeId, temperature,
mavg(temperature, 300) as avgTemp,
mstd(temperature, 300) as stdTemp
from sensor
where nodeId = "N001" and date(ts) = 2024.06.15
context by nodeId
)
where stdTemp > 0 and abs(temperature - avgTemp) / stdTemp > 3
order by tsmavg 和 mstd 是移动平均和移动标准差,窗口大小 300(5 分钟内的采样点)。Z-score 超过 3 的就是离群点。
说明: mavg/mstd 本质是按行分窗口计算,适用于时间连续的场景。如果时间序列存在间断,可以使用基于时间的窗口函数 tmavg(ts, temperature, 5m) 和 tmstd(ts, temperature, 5m),确保窗口按实际时间跨度计算,避免间断数据导致窗口跨越过大的时间范围。
// 计算温度、湿度、CO2、PM2.5之间的相关系数矩阵
select corr(temperature, humidity) as temp_humidity,
corr(temperature, co2) as temp_co2,
corr(humidity, pm25) as humidity_pm25,
corr(co2, pm25) as co2_pm25
from sensor
where date(ts) = 2024.06.15实测发现:温度和 CO2 呈正相关(0.72)——空调开了以后门窗关闭,CO2 浓度升高。这个发现很有意思,说明空调通风和新风系统的联动有优化空间。
// 工作时间 vs 非工作时间的温湿度 - PM2.5相关性 SELECT
CASE
WHEN
HOUR(ts) BETWEEN 8
AND 18 THEN
"工作时间"
ELSE
"非工作时间"
END AS period,
corr (temperature, pm25) AS temp_pm25,
corr (humidity, pm25) AS humidity_pm25,
avg(pm25) AS avgPm25
FROM
sensor
WHERE
DATE(ts) = 2024.06.15
GROUP BY
CASE
WHEN HOUR(ts) BETWEEN 8
AND 18 THEN
"工作时间"
ELSE
"非工作时间"
END这个查询验证了一个猜测:工作时间的 PM2.5 和人员密度的关联更强(因为空调循环、打印机使用等因素)。这种"分时段探索"在 IoT 数据分析中很常见——不同时段的数据分布可能完全不同。
前面的异常检测都是在历史数据上跑的。但在实际 IoT 系统中,传感器数据一进来就需要判断是否异常。
DolphinDB 的方案是:把离线分析中验证过的检测逻辑,直接挂到流计算引擎上。
@state
def envAlert(temperature, co2, pm25, tempThreshold, co2Threshold, pm25Threshold){
tempAlert = iif(temperature > tempThreshold, 1, 0)
co2Alert = iif(co2 > co2Threshold, 1, 0)
pm25Alert = iif(pm25 > pm25Threshold, 1, 0)
return (tempAlert + co2Alert + pm25Alert) > 0
}@state 注解声明这是一个有状态函数,可以在流计算引擎中复用。
// 流数据表
share streamTable(1:0, `ts`nodeId`temperature`humidity`co2`pm25`lightLevel`,
[DATETIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as envStream
// 告警输出表
alerts = table(1:0, `ts`nodeId`temperature`co2`pm25`isAlert`,
[DATETIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, INT])
factors = <[temperature, co2, pm25, envAlert(temperature, co2, pm25, 30, 1000, 75)]>
engine = createReactiveStateEngine(
name="envAlert",
metrics=factors,
dummyTable=envStream,
outputTable=alerts,
keyColumn="nodeId"
)
subscribeTable(tableName=`envStream, actionName="alert",
handler=engine)核心:envAlert 函数在离线分析里测试过、确认阈值合理后,直接挂到流引擎上。不需要翻译成 Java,不需要重新部署 Flink 任务。传感器数据每进来一条,引擎自动计算是否触发告警。
上线前用历史数据回放验证:
inputDS = replayDS(
<select * from loadTable("dfs://env_monitor", "sensor")
where date(ts) = 2024.06.15>,
`ts
)
replay(inputDS, envStream, `ts, 1000, true, 2)回放把历史数据按时间顺序注入流表,模拟传感器实时上报。验证检测逻辑没问题后,再接入真实的传感器数据源。
1. segment() 是IoT神器
连续状态识别、会话划分、持续异常检测……这些在传统 SQL 里很别扭的操作,segment() 一行搞定。
2. bar() 时间对齐比 pandas resample 更直观
直接在 SQL 里 bar(ts, 60000) 就能做分钟聚合,不用 df.set_index('ts').resample('1min').mean() 这种三连操作。
3. 多指标相关性分析无需数据搬运
corr()可以直接在库内完成多指标相关系数计算,对于数据存储在 DolphinDB 中的场景,省去了先导出再分析的步骤。如果全程用 pandas DataFrame 处理数据,这一步确实不算额外优势;但当数据量超过单机内存时,库内计算的优势才会凸显。
4. context by 分组计算写起来顺手
mavg、mstd、ffill、prev 这些函数配合 context by 按节点分组,写法和 pandas 的 groupby().transform() 类似,但在引擎层面完成,不用把数据搬到 Python。
1. 语法差异
从其他数据库迁移 SQL 时需要注意 DolphinDB 的一些语法差异,比如 case when 的写法、时间类型的表示方式等。
2. segment() 的间隔阈值要调对
断点检测的间隔阈值设太大会漏检,设太小会产生误判。建议先跑一遍 prev(ts) 的间隔统计,看看数据的实际分布再定阈值。
3. 没有内置的可视化
分析结果需要导出到 Grafana 或 Python 画图。DolphinDB 需要Grafana 插件,不过 Grafana 插件对接起来不复杂。
操作 | pandas | DolphinDB |
|---|---|---|
时间聚合 | df.resample('1min').mean() | group by bar(timestamp, 1h) |
前值填充 | df.fillna(method='ffill') | ffill() + context by |
移动平均 | df.rolling(300).mean() | mavg(col, 300) + context by |
相关系数 | df.corr() | corr(col1, col2) |
连续状态识别 | diff().gt(x).cumsum() | segment() |
数据规模 | 受限于单机内存 | 分布式,无内存限制 |
核心区别:pandas 的操作更灵活,但在数据量大的时候受内存限制。DolphinDB 的写法略不同,但不受单机内存约束,且不需要数据搬运。
这次实测覆盖了 IoT 传感器数据分析中最常见的几个操作:数据导入、缺失值处理、时间聚合、异常检测、多指标关联、实时预警。整体感受是——DolphinDB 在"IoT 数据分析"场景下确实不错,有些操作比传统方案顺手。
不是说所有 IoT 场景都需要它。如果你的传感器数量不多、只做简单的存取和看板,InfluxDB + Grafana 足够了。但如果你的分析需求包括:
那 DolphinDB 的"库内计算 + 流批一体"确实能减少很多数据搬运和代码重复的麻烦。
这次实测最大的收获不是某个具体功能,而是发现 DolphinDB 把"数据分析"这件事的门槛降低了——直接在数据库里用 SQL 就能完成。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。