首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >DolphinDB物联网实测手记:用环境传感器数据跑通时序分析的完整链路

DolphinDB物联网实测手记:用环境传感器数据跑通时序分析的完整链路

原创
作者头像
Xxtaoaooo
修改2026-05-31 13:25:01
修改2026-05-31 13:25:01
750
举报
文章被收录于专栏:应用实践应用实践

一、背景:环境传感器数据分析

1.1 测试数据

这次用的数据来自一套部署在办公楼内的环境监测系统,5 个监测节点,采集以下指标:

  • 温度(℃)
  • 湿度(%RH)
  • CO2 浓度(ppm)
  • PM2.5(μg/m³)
  • 光照强度(lux)

采样频率 1Hz,每秒每个节点上报一条,一天约 43 万条,一个月约 1300 万条。

1.2 测试目标

验证几个在 IoT 数据分析中常见的需求:

  1. 数据入库:传感器原始 CSV 数据怎么导入
  2. 缺失值处理:传感器数据难免有断点,怎么补
  3. 时间聚合:秒级数据怎么按分钟/小时汇总
  4. 异常检测:温度/CO2 超标怎么筛
  5. 多指标关联:温湿度和空气质量有没有相关性
  6. 实时预警:能不能在数据进来的时候就判断异常

1.3 测试环境

  • DolphinDB 单节点部署
  • Python 3.11 + DolphinDB Python API
  • 数据规模:约 1300 万条环境传感器记录

二、数据入库:从 CSV 到分布式时序表

2.1 建表

代码语言:javascript
复制
// 按日期 + 节点ID复合分区
db1 = database("", VALUE, 2024.01.01..2025.12.31)
db2 = database("", HASH, [SYMBOL, 5])
db = database("dfs://env_monitor", COMPO, [db1, db2])

sensor = table(1:0, `ts`nodeId`temperature`humidity`co2`pm25`lightLevel`,
               [DATETIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE])
db.createPartitionedTable(sensor, "sensor", `ts`nodeId)

日期用 VALUE 分区方便按天查询和清理历史数据;节点用 HASH 分区保证数据均匀分布。这种复合分区策略在 IoT 场景里很常见。

2.2 Python 导入

传感器原始数据是 CSV 格式,通过 Python API 导入:

代码语言:javascript
复制
import dolphindb as ddb
import pandas as pd

sess = ddb.Session()
sess.connect("localhost", 8848, "admin", "123456")

# 读取传感器CSV,解析时间列
df = pd.read_csv("env_sensor_20240615.csv", parse_dates=["ts"])

# 写入DolphinDB
sess.run("tableInsert{loadTable('dfs://env_monitor', 'sensor')}", df)
sess.close()

实测感受:Python API 支持直接传 pandas DataFrame,大部分常见类型可以自动映射,少数无法完全匹配的类型仍需手动转换,之前用 InfluxDB 的 line protocol 写入需要手动拼接字符串格式。


三、缺失值处理:传感器数据的"断点"问题

传感器数据有一个绕不开的问题——网络抖动、设备重启、固件升级都会导致数据断点。在做分析之前需要先确认数据完整性。

3.1 检测数据断点

代码语言:javascript
复制
// 找出某个节点数据中断超过5分钟的时刻
select ts, nodeId,
       double(ts - prev(ts)) / 1000 as gapSeconds
from sensor
where nodeId = "N001" and date(ts) = 2024.06.15
context by nodeId
having double(ts - prev(ts)) / 1000 > 300

context by 按节点分组后,prev(ts) 取上一条记录的时间戳,算出间隔。间隔超过 300 秒(5 分钟)的就是断点。

实测结果:在 1300 万条数据里查断点,响应在 1 秒以内。context by 的分组计算比 MySQL 的窗口函数写法简洁不少。

3.2 线性插值补缺

找到断点后,可以对短间隔的缺失做线性插值:

代码语言:javascript
复制
// 对温度和湿度做前值填充(缺失时间不超过2分钟的)
select ts, nodeId,
       iif(isNull(temperature), ffill(temperature), temperature) as temperature,
       iif(isNull(humidity), ffill(humidity), humidity) as humidity,
       co2, pm25, lightLevel
from sensor
where nodeId = "N001" and date(ts) = 2024.06.15
context by nodeId

ffill() 是前值填充(forward fill),和 pandas 的 df.fillna(method='ffill') 效果一样,区别在于 DolphinDB 的 SQL 可以自动走 map-reduce 计算——在分布式框架下数据存在多个分区,分区间 map 计算可以并行,省去了数据搬运的开销。


四、时间聚合:从秒级到分钟/小时级

IoT 数据分析中,时间聚合是最常见的操作。秒级数据太细,通常需要聚合到分钟或小时级别才能看出趋势。

4.1 按分钟聚合

代码语言:javascript
复制
// 每个节点每分钟的平均值
select nodeId,
       bar(ts, 60000) as minuteTs,  // 按60秒对齐
       avg(temperature) as avgTemp,
       avg(humidity) as avgHumidity,
       avg(co2) as avgCo2,
       avg(pm25) as avgPm25,
       max(pm25) as maxPm25
from sensor
where date(ts) = 2024.06.15
group by nodeId, bar(ts, 1m)

DolphinDB的bar(ts, 1m)函数——把时间戳按 1 分钟对齐到最近的分钟边界。效果等同于 pandas 的 df.resample('1min'),但直接在 SQL 里完成。

实测:1300 万条数据做分钟聚合,DolphinDB 库内计算比"导出 CSV → pandas resample → 写回"快一个数量级。

4.2 按小时聚合 + 日内趋势

代码语言:javascript
复制
// 一天内的温度小时趋势
select hour(ts) as hour,
       avg(temperature) as avgTemp,
       min(temperature) as minTemp,
       max(temperature) as maxTemp,
       max(temperature) - min(temperature) as tempRange
from sensor
where nodeId = "N001" and date(ts) = 2024.06.15
group by hour(ts)
order by hour

查询解析:一天内温度波动有多大?哪些时段温差最大?

实测发现办公楼温度有明显的规律:早上 8 点空调开启后温度急降,中午 12-13 点午休空调调高,晚上 18 点后回升。这种"日内趋势"在小时聚合下一目了然。

4.3 分位数统计

代码语言:javascript
复制
// PM2.5的小时分位数分布
select hour(ts) as hour,
       percentile(pm25, 25) as p25,
       percentile(pm25, 50) as median,
       percentile(pm25, 75) as p75,
       percentile(pm25, 95) as p95,
       percentile(pm25, 99) as p99
from sensor
where date(ts) = 2024.06.15
group by hour(ts)
order by hour

percentile() 直接算分位数,不需要导出数据再用 np.percentile。PM2.5 的 P95 和 P99 是评估空气质量的关键指标——平均值看着达标,但高峰时段可能超标。


五、异常检测:温度和CO2超标筛选

5.1 简单阈值筛选

代码语言:javascript
复制
// 找出温度超过30℃或CO2超过1000ppm的记录
select ts, nodeId, temperature, co2, pm25
from sensor
where date(ts) = 2024.06.15
    and (temperature > 30 or co2 > 1000)
order by ts

标准 SQL,没什么特别的。

5.2 持续异常检测:连续超标N分钟

单条记录超标可能是瞬时波动,持续超标才是真问题:

代码语言:javascript
复制
// 找出CO2连续超标超过10分钟的时段
select nodeId,
       first(ts) as anomalyStart,
       last(ts) as anomalyEnd,
       count(*) as durationSeconds,
       avg(co2) as avgCo2,
       max(co2) as peakCo2
from (
    select ts, nodeId, co2,
           segment(ts, 1m) as seg  // 间隔超过1分钟则断开
    from sensor
    where date(ts) = 2024.06.15 and co2 > 1000
    context by nodeId
)
group by nodeId, segment(ts, 1m) as seg
having count(*) > 600  // 持续超过600秒(10分钟)
order by anomalyStart

这个查询的思路:先用 segment() 把连续超标的记录分组(中间有间隔超过 1 分钟就断开),然后筛出持续时间超过 10 分钟的。

实测感受segment() 在 IoT 场景里很方便。在 pandas 里做类似操作需要 diff().gt(threshold).cumsum() 三步组合,DolphinDB 一个函数就行。这种"连续状态识别"在传感器数据分析中非常常见——设备故障持续多久、温度超标持续多久、网络中断持续多久,都是同一个模式。

5.3 离群点检测:3-sigma

代码语言:javascript
复制
// 用3-sigma原则找出每个节点的温度离群点
select ts, nodeId, temperature,
       avgTemp, stdTemp,
       abs(temperature - avgTemp) / stdTemp as zscore
from (
    select ts, nodeId, temperature,
           mavg(temperature, 300) as avgTemp,
           mstd(temperature, 300) as stdTemp
    from sensor
    where nodeId = "N001" and date(ts) = 2024.06.15
    context by nodeId
)
where stdTemp > 0 and abs(temperature - avgTemp) / stdTemp > 3
order by ts

mavgmstd 是移动平均和移动标准差,窗口大小 300(5 分钟内的采样点)。Z-score 超过 3 的就是离群点。

说明: mavg/mstd 本质是按行分窗口计算,适用于时间连续的场景。如果时间序列存在间断,可以使用基于时间的窗口函数 tmavg(ts, temperature, 5m) 和 tmstd(ts, temperature, 5m),确保窗口按实际时间跨度计算,避免间断数据导致窗口跨越过大的时间范围。


六、多指标关联:温度、湿度和空气质量的关系

6.1 相关系数

代码语言:javascript
复制
// 计算温度、湿度、CO2、PM2.5之间的相关系数矩阵
select corr(temperature, humidity) as temp_humidity,
       corr(temperature, co2) as temp_co2,
       corr(humidity, pm25) as humidity_pm25,
       corr(co2, pm25) as co2_pm25
from sensor
where date(ts) = 2024.06.15

实测发现:温度和 CO2 呈正相关(0.72)——空调开了以后门窗关闭,CO2 浓度升高。这个发现很有意思,说明空调通风和新风系统的联动有优化空间。

6.2 分时段相关性

代码语言:javascript
复制
// 工作时间 vs 非工作时间的温湿度 - PM2.5相关性 SELECT
  CASE
    WHEN
      HOUR(ts) BETWEEN 8
      AND 18 THEN
      "工作时间"
    ELSE
      "非工作时间"
  END AS period,
  corr (temperature, pm25) AS temp_pm25,
  corr (humidity, pm25) AS humidity_pm25,
  avg(pm25) AS avgPm25
FROM
  sensor
WHERE
  DATE(ts) = 2024.06.15
GROUP BY
  CASE
    WHEN HOUR(ts) BETWEEN 8
      AND 18 THEN
      "工作时间"
    ELSE
      "非工作时间"
  END

这个查询验证了一个猜测:工作时间的 PM2.5 和人员密度的关联更强(因为空调循环、打印机使用等因素)。这种"分时段探索"在 IoT 数据分析中很常见——不同时段的数据分布可能完全不同。


七、实时预警:把离线逻辑搬到流计算上

前面的异常检测都是在历史数据上跑的。但在实际 IoT 系统中,传感器数据一进来就需要判断是否异常。

DolphinDB 的方案是:把离线分析中验证过的检测逻辑,直接挂到流计算引擎上

7.1 定义检测函数

代码语言:javascript
复制
@state
def envAlert(temperature, co2, pm25, tempThreshold, co2Threshold, pm25Threshold){
    tempAlert = iif(temperature > tempThreshold, 1, 0)
    co2Alert = iif(co2 > co2Threshold, 1, 0)
    pm25Alert = iif(pm25 > pm25Threshold, 1, 0)
    return (tempAlert + co2Alert + pm25Alert) > 0
}

@state 注解声明这是一个有状态函数,可以在流计算引擎中复用。

7.2 挂载到流引擎

代码语言:javascript
复制
// 流数据表
share streamTable(1:0, `ts`nodeId`temperature`humidity`co2`pm25`lightLevel`,
                  [DATETIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as envStream

// 告警输出表
alerts = table(1:0, `ts`nodeId`temperature`co2`pm25`isAlert`,
               [DATETIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, INT])

factors = <[temperature, co2, pm25, envAlert(temperature, co2, pm25, 30, 1000, 75)]>
engine = createReactiveStateEngine(
    name="envAlert",
    metrics=factors,
    dummyTable=envStream,
    outputTable=alerts,
    keyColumn="nodeId"
)

subscribeTable(tableName=`envStream, actionName="alert",
               handler=engine)

核心envAlert 函数在离线分析里测试过、确认阈值合理后,直接挂到流引擎上。不需要翻译成 Java,不需要重新部署 Flink 任务。传感器数据每进来一条,引擎自动计算是否触发告警。

7.3 历史回放验证

上线前用历史数据回放验证:

代码语言:javascript
复制
inputDS = replayDS(
    <select * from loadTable("dfs://env_monitor", "sensor")
     where date(ts) = 2024.06.15>,
    `ts
)
replay(inputDS, envStream, `ts, 1000, true, 2)

回放把历史数据按时间顺序注入流表,模拟传感器实时上报。验证检测逻辑没问题后,再接入真实的传感器数据源。


八、总结

8.1 优势点

1. segment() 是IoT神器

连续状态识别、会话划分、持续异常检测……这些在传统 SQL 里很别扭的操作,segment() 一行搞定。

2. bar() 时间对齐比 pandas resample 更直观

直接在 SQL 里 bar(ts, 60000) 就能做分钟聚合,不用 df.set_index('ts').resample('1min').mean() 这种三连操作。

3. 多指标相关性分析无需数据搬运

corr()可以直接在库内完成多指标相关系数计算,对于数据存储在 DolphinDB 中的场景,省去了先导出再分析的步骤。如果全程用 pandas DataFrame 处理数据,这一步确实不算额外优势;但当数据量超过单机内存时,库内计算的优势才会凸显。

4. context by 分组计算写起来顺手

mavgmstdffillprev 这些函数配合 context by 按节点分组,写法和 pandas 的 groupby().transform() 类似,但在引擎层面完成,不用把数据搬到 Python。

8.2 踩过的坑

1. 语法差异

从其他数据库迁移 SQL 时需要注意 DolphinDB 的一些语法差异,比如 case when 的写法、时间类型的表示方式等。

2. segment() 的间隔阈值要调对

断点检测的间隔阈值设太大会漏检,设太小会产生误判。建议先跑一遍 prev(ts) 的间隔统计,看看数据的实际分布再定阈值。

3. 没有内置的可视化

分析结果需要导出到 Grafana 或 Python 画图。DolphinDB 需要Grafana 插件,不过 Grafana 插件对接起来不复杂。

8.3 和 pandas 的对比

操作

pandas

DolphinDB

时间聚合

df.resample('1min').mean()

group by bar(timestamp, 1h)

前值填充

df.fillna(method='ffill')

ffill() + context by

移动平均

df.rolling(300).mean()

mavg(col, 300) + context by

相关系数

df.corr()

corr(col1, col2)

连续状态识别

diff().gt(x).cumsum()

segment()

数据规模

受限于单机内存

分布式,无内存限制

核心区别:pandas 的操作更灵活,但在数据量大的时候受内存限制。DolphinDB 的写法略不同,但不受单机内存约束,且不需要数据搬运。


九、总结

这次实测覆盖了 IoT 传感器数据分析中最常见的几个操作:数据导入、缺失值处理、时间聚合、异常检测、多指标关联、实时预警。整体感受是——DolphinDB 在"IoT 数据分析"场景下确实不错,有些操作比传统方案顺手

不是说所有 IoT 场景都需要它。如果你的传感器数量不多、只做简单的存取和看板,InfluxDB + Grafana 足够了。但如果你的分析需求包括:

  • 多指标关联分析(温度、湿度、空气质量的相互影响)
  • 持续异常检测(不只是单点超标,而是持续多久)
  • 不同时间粒度的聚合(秒级、分钟级、小时级自由切换)
  • 离线验证后的逻辑直接用于实时预警

那 DolphinDB 的"库内计算 + 流批一体"确实能减少很多数据搬运和代码重复的麻烦。

这次实测最大的收获不是某个具体功能,而是发现 DolphinDB 把"数据分析"这件事的门槛降低了——直接在数据库里用 SQL 就能完成。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、背景:环境传感器数据分析
    • 1.1 测试数据
    • 1.2 测试目标
    • 1.3 测试环境
  • 二、数据入库:从 CSV 到分布式时序表
    • 2.1 建表
    • 2.2 Python 导入
  • 三、缺失值处理:传感器数据的"断点"问题
    • 3.1 检测数据断点
    • 3.2 线性插值补缺
  • 四、时间聚合:从秒级到分钟/小时级
    • 4.1 按分钟聚合
    • 4.2 按小时聚合 + 日内趋势
    • 4.3 分位数统计
  • 五、异常检测:温度和CO2超标筛选
    • 5.1 简单阈值筛选
    • 5.2 持续异常检测:连续超标N分钟
    • 5.3 离群点检测:3-sigma
  • 六、多指标关联:温度、湿度和空气质量的关系
    • 6.1 相关系数
    • 6.2 分时段相关性
  • 七、实时预警:把离线逻辑搬到流计算上
    • 7.1 定义检测函数
    • 7.2 挂载到流引擎
    • 7.3 历史回放验证
  • 八、总结
    • 8.1 优势点
    • 8.2 踩过的坑
    • 8.3 和 pandas 的对比
  • 九、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档