我已经设置了本地kafka集群和本地微服务端点。我想要的是向httpsink提供像{"foo":"bar"}这样的主题消息,这样它就可以用json调用API端点。
我没有使用模式注册中心。
下面是我的合流httpsink连接器配置,
{
"name": "httpsink3",
"config": {
"name": "httpsink3",
"connector.class": "io.confluent.connect.http.HttpSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"topics": "test3",
"http.api.url": "http://localhost:5000/out",
"reporter.result.topic.name": "success-responses",
"reporter.result.topic.replication.factor": "1",
"reporter.error.topic.name": "error-responses",
"reporter.error.topic.replication.factor": "1",
"reporter.bootstrap.servers": "localhost:9092",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}结果是
decision-decision-1 | [2022-12-01 15:10:52,500] DEBUG in routes 79: request.headers=Content-Length: 13
decision-decision-1 | Content-Type: text/plain; charset=UTF-8
decision-decision-1 | Host: localhost:5000
decision-decision-1 | Connection: Keep-Alive
decision-decision-1 | User-Agent: Apache-HttpClient/4.5.13 (Java/11.0.17)
decision-decision-1 | Accept-Encoding: gzip,deflate
decision-decision-1 |
decision-decision-1 |
decision-decision-1 | [2022-12-01 15:10:52,501] DEBUG in routes 80: request.data={"foo":"bar"}
decision-decision-1 | [2022-12-01 15:10:52,502] DEBUG in routes 86: is json str我期望将输出消息头设置为application/json,而不是text/平原
我也尝试过使用选项"value.converter": "org.apache.kafka.connect.json.JsonConverter",,但更糟糕的是,它保留了文本标题,但将内容更改为{"foo"="bar"}
有什么办法来配置这个吗?
非常感谢!
发布于 2022-12-01 15:56:55
已修复。遵循这个https://docs.confluent.io/kafka-connectors/http/current/overview.html#header-forwarding-example
并将标题选项更改为" headers ":“Content: application/json”
https://stackoverflow.com/questions/74643641
复制相似问题