
特性 | FileSystemRMStateStore | LeveldbRMStateStore | ZKRMStateStore |
|---|---|---|---|
存储后端 | Hadoop FileSystem (HDFS/S3/本地) | LevelDB 嵌入式数据库 | ZooKeeper 分布式协调服务 |
代码行数 | ~1018 行 | ~797 行 | ~1493 行 |
版本 | 1.3 | 1.1 | 1.5 |
依赖 | Hadoop FileSystem API | leveldbjni (JNI) | Apache Curator + Zookeeper |
架构类型 | 主从式 (共享存储) | 单机嵌入式 | 分布式共识 |
一致性模型 | 最终一致性 (依赖FS) | 强一致性 | 强一致性 (ZAB协议) |
hdfs://namenode:8020/rmstate/FSRMStateRoot/
├── RMVersionNode # 版本信息
├── EpochNode # Epoch计数器
├── RMDTSecretManagerRoot/ # 委托令牌
│ ├── DelegationKey_<id>
│ ├── RMDelegationToken_<seq>
│ └── RMDTSequenceNumber_<seq>
├── AMRMTokenSecretManagerRoot/ # AMRM令牌
│ └── AMRMTokenSecretManagerNode
├── RMAppRoot/ # 应用状态
│ └── <ApplicationId>/
│ ├── <ApplicationId>
│ └── attempt_<id>
├── ReservationSystemRoot/ # 资源预留
│ └── <PlanName>/
└── ProxyCARoot/ # CA证书
├── caCert
└── caPrivateKey特点:
数据库: yarn-rm-state (LevelDB)
Key | Value
--------------------------------------|------------------------------
RMVersionNode | VersionProto
EpochNode | EpochProto
RMDTSecretManagerRoot/DelegationKey_<id> | DelegationKey
RMDTSecretManagerRoot/RMDelegationToken_<seq> | TokenData
RMDTSecretManagerRoot/RMDTSequentialNumber | Int
RMAppRoot/application_<id> | ApplicationStateDataProto
AMRMTokenSecretManagerRoot | AMRMTokenStateProto
ProxyCARoot/caCert | X509Certificate (DER)
ProxyCARoot/caPrivateKey | PrivateKey (DER)
ReservationSystemRoot/<plan>/<res_id> | ReservationProto特点:
/)/yarn rm/ZKRMStateRoot/
├── VERSION_INFO # 版本
├── EPOCH_NODE # Epoch
├── RM_ZK_FENCING_LOCK # 锁节点 (HA)
├── RMAppRoot/
│ ├── HIERARCHIES/ # 分层索引 (1-4)
│ │ ├── 1/application_1
│ │ │ └── application_1234_0001_01 # 尝试状态
│ │ ├── 2/application_12
│ │ │ └── application_34_0001_01
│ │ └── ...
│ └── application_<id>
│ └── attempt_<id>
├── RMDTSecretManagerRoot/
│ ├── RMDTSequentialNumber # 序列号
│ ├── RMDTMasterKeysRoot/
│ │ └── Key_<id>
│ └── RMDelegationTokensRoot/
│ ├── 1/Token_1
│ │ └── Token_1234
│ └── ...
├── AMRMTokenSecretManagerRoot/
│ ├── currentMasterKey
│ └── nextMasterKey
├── ReservationSystemRoot/
│ └── <PlanName>/
│ └── <ReservationId>
└── ProxyCARoot/
├── caCert
└── caPrivateKey特点:
实现 | 存储方式 | 原子性保证 |
|---|---|---|
FileSystemRMStateStore | 临时文件 + rename (2-3次IO) | 依赖文件系统 rename |
LeveldbRMStateStore | 直接 db.put() (1次IO) | LevelDB 事务 |
ZKRMStateStore | safeCreate() + 事务 (多次IO) | ZK 事务 (Multi) |
// FileSystemRMStateStore - 临时文件策略
protected void writeFile(Path outputPath, byte[] data, boolean makeUnreadableByAdmin) {
Path tempPath = new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
fsOut = fs.create(tempPath, true);
fsOut.write(data);
fsOut.close();
fs.rename(tempPath, outputPath); // 原子重命名
}
// LeveldbRMStateStore - 直接写入
protected void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateData) {
db.put(bytes(key), appStateData.getProto().toByteArray());
}
// ZKRMStateStore - 安全创建
public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), true);
zkManager.safeCreate(nodeCreatePath, appStateData, zkAcl,
CreateMode.PERSISTENT, zkAcl, fencingNodePath);
}实现 | 令牌存储 | 序列号存储 | 原子性 |
|---|---|---|---|
FileSystemRMStateStore | 单独文件 | 单独文件 | ❌ 不保证 |
LeveldbRMStateStore | WriteBatch | WriteBatch | ✅ 保证 |
ZKRMStateStore | 事务 | 事务 | ✅ 保证 |
// FileSystemRMStateStore - 分开存储,存在不一致风险
private void storeOrUpdateRMDelegationTokenState(...) {
writeFileWithRetries(nodeCreatePath, identifierData.toByteArray(), true);
// 序列号单独存储...
if (dtSequenceNumberPath == null) {
createFileWithRetries(latestSequenceNumberPath);
} else {
renameFileWithRetries(dtSequenceNumberPath, latestSequenceNumberPath);
}
}
// LeveldbRMStateStore - 原子批量操作
private void storeOrUpdateRMDT(...) {
try (WriteBatch batch = db.createWriteBatch()) {
batch.put(bytes(tokenKey), tokenData.toByteArray());
if (!isUpdate) {
batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
}
db.write(batch); // 原子提交
}
}
// ZKRMStateStore - ZK 事务
protected synchronized void storeRMDelegationTokenState(...) {
SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl, CreateMode.PERSISTENT);
trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
trx.commit(); // 原子提交
}实现 | 应用删除 | 尝试删除 | 批量删除 |
|---|---|---|---|
FileSystemRMStateStore | 递归删除目录 | 单独删除 | ❌ 无 |
LeveldbRMStateStore | WriteBatch 删除 | WriteBatch 删除 | ✅ 支持 |
ZKRMStateStore | safeDelete + 层级清理 | safeDelete | ✅ 支持 |
特性 | FileSystemRMStateStore | LeveldbRMStateStore | ZKRMStateStore |
|---|---|---|---|
HA 支持 | ✅ 支持 (共享存储) | ❌ 不支持 | ✅ 支持 |
Fencing 机制 | ❌ 无 | ❌ 无 | ✅ 有 |
Epoch 管理 | ✅ 有 | ✅ 有 | ✅ 有 |
脑裂防护 | 依赖共享存储 | 不支持 | ZK 锁 |
// Fencing 锁节点
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
private String fencingNodePath;
// 验证活跃状态线程
private class VerifyActiveStatusThread extends Thread {
@Override
public void work() {
while (!isFencedState()) {
// 周期性创建/删除 fence 节点
zkManager.createTransaction(zkAcl, fencingNodePath).commit();
Thread.sleep(zkSessionTimeout);
}
}
}
// 安全事务 - 带 fencing
public void safeCreate(String path, ...) {
SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
trx.create(path, data, acl, mode);
trx.commit();
}// 应用ID分层存储 - 避免单节点children过多
// 配置: yarn.resourcemanager.zk-appid-node-split-index
// splitIndex=2 时: application_12 / application_34_0001
private Map<Integer, String> rmAppRootHierarchies;
for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
rmAppRootHierarchies.put(splitIndex,
getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
}实现 | 机制 | 依赖 |
|---|---|---|
FileSystemRMStateStore | .tmp/.new 文件 + rename | 文件系统原子性 |
LeveldbRMStateStore | WriteBatch | LevelDB WAL |
ZKRMStateStore | SafeTransaction (Multi) | ZAB 协议 |
实现 | 恢复机制 | 不完整记录处理 |
|---|---|---|
FileSystemRMStateStore | 手动清理 .tmp 文件 | checkAndRemovePartialRecord() |
LeveldbRMStateStore | 自动恢复 | LevelDB 自动应用 WAL |
ZKRMStateStore | 自动恢复 | ZK 自动处理 |
实现 | 重试方式 | 配置参数 |
|---|---|---|
FileSystemRMStateStore | FSAction.runWithRetries() | fsNumRetries=15, fsRetryInterval=1000ms |
LeveldbRMStateStore | LevelDB 内置 | 无需配置 |
ZKRMStateStore | Curator 内置重试 | zkSessionTimeout, retryPolicy |
操作 | FileSystemRMStateStore | LeveldbRMStateStore | ZKRMStateStore |
|---|---|---|---|
存储应用状态 | 2-3 次 (tmp+rename) | 1 次 (put) | 1-2 次 |
更新应用状态 | 3 次 (new+delete+rename) | 1 次 (put) | 1 次 |
删除应用 | N+1 次 | 1 次 (batch) | 多次 (递归) |
存储令牌+序列号 | 2-4 次 | 1 次 (batch) | 1 次 (事务) |
特性 | FileSystemRMStateStore | LeveldbRMStateStore | ZKRMStateStore |
|---|---|---|---|
数据索引 | 无 (文件名) | B+Tree | Znode 路径 |
范围查询 | 低效 | 高效 | 中等 |
压缩 | 无 | Snappy | 无 |
缓存 | HDFS DataNode | BlockCache | ZK Client Cache |
节点限制 | 无 | 磁盘容量 | ZK 单节点 1MB |
实现 | 权限机制 | 加密支持 |
|---|---|---|
FileSystemRMStateStore | HDFS ACLs, XAttr | HDFS 加密区域 |
LeveldbRMStateStore | 文件系统权限 | 无 |
ZKRMStateStore | ZK ACLs (Digest/SASL) | SSL/TLS |
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
</property>
<property>
<name>yarn.resourcemanager.fs.state-store.uri</name>
<value>hdfs://namenode:8020/rmstate</value>
</property>
<property>
<name>yarn.resourcemanager.fs.state-store.num-retries</name>
<value>15</value>
</property>
<property>
<name>yarn.resourcemanager.fs.state-store.retry-interval-ms</name>
<value>1000</value>
</property><property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore</value>
</property>
<property>
<name>yarn.resourcemanager.leveldb.state-store.path</name>
<value>/var/log/yarn/rmstate</value>
</property>
<property>
<name>yarn.resourcemanager.leveldb.compaction-interval-secs</name>
<value>3600</value>
</property><property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
<property>
<name>yarn.resourcemanager.zk.state-store.parent-path</name>
<value>/yarn rm</value>
</property>
<property>
<name>yarn.resourcemanager.zk.timeout-ms</name>
<value>10000</value>
</property>
<property>
<name>yarn.resourcemanager.zk.num-retries</name>
<value>3</value>
</property>
<property>
<name>yarn.resourcemanager.zk.znode-size-limit-bytes</name>
<value>10485760</value>
</property>优点 | 缺点 |
|---|---|
✅ 共享存储支持,适合 HA | ❌ IO 开销大 |
✅ 生态系统集成 | ❌ 原子性依赖外部文件系统 |
✅ 跨平台 (HDFS/S3/本地) | ❌ 令牌和序列号分开存储 |
✅ 简单直观 | ❌ 性能一般 |
优点 | 缺点 |
|---|---|
✅ 高性能 (嵌入式) | ❌ 单机限制 |
✅ WriteBatch 原子性 | ❌ 无法跨机器共享 |
✅ 内置索引和压缩 | ❌ 不适合 HA |
✅ 故障自动恢复 | ❌ 容量受单机磁盘限制 |
优点 | 缺点 |
|---|---|
✅ 分布式一致性 | ❌ 依赖 ZK 集群 |
✅ HA 成熟方案 | ❌ ZK 节点大小限制 (1MB) |
✅ Fencing 机制 | ❌ 性能受网络延迟影响 |
✅ ACL 权限控制 | ❌ 需要额外维护 ZK |
场景选择决策树
│
▼
┌───────────────────────────┐
│ 是否有 HA 需求? │
└───────────────────────────┘
/ \
是 否
/ \
┌─────────────────┐ ┌─────────────────┐
│ 是否有共享存储? │ │ 性能要求如何? │
└─────────────────┘ └─────────────────┘
/ \ / \
是 否 高 低
/ \ / \
┌──────────────────┐ ┌──────────┐┌──────────┐ ┌──────────────┐
│ FileSystemRMState│ │ ZKRM ││ Leveldb │ │ MemoryRM │
│ Store (HDFS) │ │StateStore││StateStore│ │ StateStore │
└──────────────────┘ └──────────┘└──────────┘ └──────────────┘场景 | 推荐方案 | 原因 |
|---|---|---|
生产 HA 集群 | ZKRMStateStore | 成熟 HA 方案,fencing 支持 |
HDFS 已有 | FileSystemRMStateStore | 共享存储利用 |
开发测试 | LeveldbRMStateStore | 简单高性能 |
小规模/边缘 | LeveldbRMStateStore | 无外部依赖 |
超大规模集群 | ZKRMStateStore | 分层优化 |
版本 | FileSystemRMStateStore | LeveldbRMStateStore | ZKRMStateStore |
|---|---|---|---|
1.0 | 初始版本 | 初始版本 | 初始版本 |
1.1 | - | 预留系统 | - |
1.2 | AMRMToken 分离 | - | AMRMToken 分离 |
1.3 | 预留系统 | - | 预留系统 |
1.4 | - | - | 应用节点分层 |
1.5 | - | - | 令牌节点分层 |
维度 | FileSystemRMStateStore | LeveldbRMStateStore | ZKRMStateStore |
|---|---|---|---|
架构 | 主从 (共享存储) | 单机嵌入 | 分布式共识 |
一致性 | 最终一致 | 强一致 | 强一致 |
HA | 依赖存储 | 不支持 | 原生支持 |
性能 | 中 | 高 | 中 |
复杂度 | 低 | 低 | 高 |
运维 | 简单 | 简单 | 需维护 ZK |
三种实现各有适用场景,选择时需综合考虑: