首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >远程分区:工作者端序列化问题

远程分区:工作者端序列化问题
EN

Stack Overflow用户
提问于 2020-10-22 23:33:24
回答 1查看 207关注 0票数 0

当运行worker but exception时,我使用了org.springframework.kafka.support.serializer.JsonDeserializer和org.apache.kafka.common.serialization.ByteArrayDeserializer,它们在主控端正常工作,但在worker端出现异常.最后我得到的原因是: java.lang.StackOverflowError: null

代码语言:javascript
复制
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'handler'; defined in: 'batch.configuration.WorkerConfiguration'; from source: 'org.springframework.core.type.StandardMethodMetadata@7f811d00']; nested exception is org.apache.kafka.common.errors.SerializationException: Can't serialize data [StepExecution: id=5, version=93, name=workerStep:partition4, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=31, rollbackCount=0, exitDescription=] for topic [reply], failedMessage=GenericMessage [payload=StepExecution: id=5, version=93, name=workerStep:partition4, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=31, rollbackCount=0, exitDescription=, headers={sequenceNumber=1, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@454b7395, sequenceSize=10, kafka_timestampType=CREATE_TIME, kafka_replyTopic=reply, kafka_receivedTopic=requests, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@454b7395, kafka_offset=5, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1a6c92e3, kafka_correlationId=[B@ef1f7b2, correlationId=1:workerStep, id=49d312f5-618f-5085-fad1-7868e8e06aa0, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1603368744168, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = requests, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1603368744168, serialized key size = -1, serialized value size = 64, headers = RecordHeaders(headers = [RecordHeader(key = sequenceNumber, value = [49]), RecordHeader(key = sequenceSize, value = [49, 48]), RecordHeader(key = correlationId, value = [49, 58, 119, 111, 114, 107, 101, 114, 83, 116, 101, 112]), RecordHeader(key = kafka_replyTopic, value = [114, 101, 112, 108, 121]), RecordHeader(key = spring_json_header_types, value = [123, 34, 115, 101, 113, 117, 101, 110, 99, 101, 78, 117, 109, 98, 101, 114, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 115, 101, 113, 117, 101, 110, 99, 101, 83, 105, 122, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 99, 111, 114, 114, 101, 108, 97, 116, 105, 111, 110, 73, 100, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 107, 97, 102, 107, 97, 95, 114, 101, 112, 108, 121, 84, 111, 112, 105, 99, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125]), RecordHeader(key = kafka_replyTopic, value = [114, 101, 112, 108, 121]), RecordHeader(key = kafka_correlationId, value = [-127, -49, 126, 33, 100, 99, 79, -14, -81, -35, -70, -88, -61, 37, 61, -63])], isReadOnly = false), key = null, value = StepExecutionRequest: [jobExecutionId=1, stepExecutionId=5, stepName=workerStep]), kafka_groupId=repliesGroup, timestamp=1603379041885}]
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1887) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1792) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1719) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1617) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'handler'; defined in: 'batch.configuration.WorkerConfiguration'; from source: 'org.springframework.core.type.StandardMethodMetadata@7f811d00']; nested exception is org.apache.kafka.common.errors.SerializationException: Can't serialize data [StepExecution: id=5, version=93, name=workerStep:partition4, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=31, rollbackCount=0, exitDescription=] for topic [reply], failedMessage=GenericMessage [payload=StepExecution: id=5, version=93, name=workerStep:partition4, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=31, rollbackCount=0, exitDescription=, headers={sequenceNumber=1, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@454b7395, sequenceSize=10, kafka_timestampType=CREATE_TIME, kafka_replyTopic=reply, kafka_receivedTopic=requests, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@454b7395, kafka_offset=5, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1a6c92e3, kafka_correlationId=[B@ef1f7b2, correlationId=1:workerStep, id=49d312f5-618f-5085-fad1-7868e8e06aa0, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1603368744168, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = requests, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1603368744168, serialized key size = -1, serialized value size = 64, headers = RecordHeaders(headers = [RecordHeader(key = sequenceNumber, value = [49]), RecordHeader(key = sequenceSize, value = [49, 48]), RecordHeader(key = correlationId, value = [49, 58, 119, 111, 114, 107, 101, 114, 83, 116, 101, 112]), RecordHeader(key = kafka_replyTopic, value = [114, 101, 112, 108, 121]), RecordHeader(key = spring_json_header_types, value = [123, 34, 115, 101, 113, 117, 101, 110, 99, 101, 78, 117, 109, 98, 101, 114, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 115, 101, 113, 117, 101, 110, 99, 101, 83, 105, 122, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 99, 111, 114, 114, 101, 108, 97, 116, 105, 111, 110, 73, 100, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 107, 97, 102, 107, 97, 95, 114, 101, 112, 108, 121, 84, 111, 112, 105, 99, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125]), RecordHeader(key = kafka_replyTopic, value = [114, 101, 112, 108, 121]), RecordHeader(key = kafka_correlationId, value = [-127, -49, 126, 33, 100, 99, 79, -14, -81, -35, -70, -88, -61, 37, 61, -63])], isReadOnly = false), key = null, value = StepExecutionRequest: [jobExecutionId=1, stepExecutionId=5, stepName=workerStep]), kafka_groupId=repliesGroup, timestamp=1603379041885}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1902) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    ... 10 common frames omitted
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'handler'; defined in: 'batch.configuration.WorkerConfiguration'; from source: 'org.springframework.core.type.StandardMethodMetadata@7f811d00']; nested exception is org.apache.kafka.common.errors.SerializationException: Can't serialize data [StepExecution: id=5, version=93, name=workerStep:partition4, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=31, rollbackCount=0, exitDescription=] for topic [reply]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:79) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.

工人代码

代码语言:javascript
复制
@SpringBootApplication
@EnableBatchProcessing
@EnableBatchIntegration
@EnableIntegration
@ImportResource("context.xml")
public class WorkerConfiguration {

    private final RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
    private final JobBuilderFactory jobBuilderFactory;

    @Autowired
    ProducerFactory<String, String> producerFactory;

    @Autowired
    public JobExplorer jobExplorer;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public WorkerConfiguration(JobBuilderFactory jobBuilderFactory, RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.workerStepBuilderFactory = workerStepBuilderFactory;
    }

    /*
     * Configure inbound flow (requests coming from the master)
     */
    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    /*
     * Configure outbound flow (replies going to the master)
     */
    @Bean
    public DirectChannel replies() {
        return new DirectChannel();
    }

    /*
        protected JobRepository createMyJobRepository() throws Exception {
            JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
            factory.setTransactionManager(new ResourcelessTransactionManager());
            factory.setDataSource(createDataSourceForRepository());
            factory.setDatabaseType("HSQL");
            return factory.getObject();
        }


        public DataSource createDataSourceForRepository() {
            return DataSourceBuilder.create()
                    .url("jdbc:hsqldb:file:src/main/resources/hsqldb/batchcore.db;hsqldb.lock_file=false;shutdown=true;")
                    .driverClassName("org.hsqldb.jdbcDriver")
                    .username("sa")
                    .password("")
                    .build();
        }


        public DataSource createDataSourceForRepository() {
            return DataSourceBuilder.create()
                    .url("jdbc:postgresql://localhost:5432/bdauser")
                    .driverClassName("org.postgresql.Driver")
                    .username("bdauser")
                    .password("bdauser")
                    .build();
        }



        @Bean
        public BatchConfigurer batchConfigurer() {
            return new DefaultBatchConfigurer(createDataSourceForRepository()) {
                @Override
                public JobRepository getJobRepository() {
                    JobRepository jobRepository = null;
                    try {
                        jobRepository = createMyJobRepository();

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("************************WORKER  replying Inside  batchConfigurer    ****************");
                    return jobRepository;
                }
            };
        }

    */


    @Bean
    @ServiceActivator(inputChannel = "replies")
    public MessageHandler handler() throws Exception {
        System.out.println("************************  worker inside serviceactivator **********************");
        KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
        handler.setTopicExpression(new LiteralExpression("reply"));
        return handler;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
         return new KafkaTemplate<>(producerFactory);
    }


        @Bean
        public KafkaMessageListenerContainer<String, String> replyContainer(ConsumerFactory<String, String> cf) {
            ContainerProperties containerProperties = new ContainerProperties(new TopicPartitionOffset("nullChannel", 0));
            System.out.println("************************** WORKER  ContainerProperties   *****************************");
            containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
            return new KafkaMessageListenerContainer<>(cf, containerProperties);
        }


        @Primary
        @Bean
        public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> producerFactory, KafkaMessageListenerContainer<String, String> repliesContainer) {
            System.out.println("**************************WORKER  replying Template Templet  *****************************");
            ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory,repliesContainer);
            replyingKafkaTemplate.setSharedReplyTopic(true);
            Duration d = Duration.ofSeconds(50);
            replyingKafkaTemplate.setDefaultReplyTimeout(d);
            return replyingKafkaTemplate;
        }

        @Bean
        public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
            System.out.println("**************** worker node ConcurrentMessageListenerContainer  *****************************");
            ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("requests");
            repliesContainer.getContainerProperties().setGroupId("repliesGroup");
            repliesContainer.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
            repliesContainer.setAutoStartup(false);
            return repliesContainer;
        }


        @Bean
        public IntegrationFlow serverGateway(ConcurrentMessageListenerContainer<String, String> container, KafkaTemplate<String, String> template) {
            return IntegrationFlows
                   .from(Kafka.inboundGateway(container, template)
                   .replyTimeout(3000000))
                   .channel(requests())
                   .get();
       }


        @Bean
        public Job remotePartitioningJob() {
            System.out.println("*******************  inside  remotePartitioningJob **************************");
            return this.jobBuilderFactory.get("remotePartitioningJobMy")
                    .incrementer(new RunIdIncrementer())
                    .start(workerStep())
                    .build();
        }

        /*
         * Configure the worker step
         */
    @Bean
    public Step workerStep() {
        System.out.println("*******************  inside  worker step **************************");
        return this.workerStepBuilderFactory.get("workerStep")
                .inputChannel(requests())
                .outputChannel(replies())
                .tasklet(tasklet(null))
                .build();
    }

    @Bean
    @StepScope
    public Tasklet tasklet(@Value("#{stepExecutionContext['partition']}") String partition) {
        return (contribution, chunkContext) -> {
            System.out.println("processing " + partition);
            return RepeatStatus.FINISHED;
        };
    }

    @Bean
    public PartitionHandler partitionHandler() throws Exception {
        MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
        partitionHandler.setStepName("slaveStep");
        partitionHandler.setGridSize(10);
        partitionHandler.setMessagingOperations(messageTemplate());
        partitionHandler.setPollInterval(5000l);
        partitionHandler.setJobExplorer(this.jobExplorer);
        partitionHandler.afterPropertiesSet();
        return partitionHandler;
    }


    @Bean
    public MessagingTemplate messageTemplate() {
        MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());
        messagingTemplate.setReceiveTimeout(60000000l);
        return messagingTemplate;
    }


    @Bean
    public ExecutorChannel outboundRequests() {
        return MessageChannels.executor("outboundRequests", new SimpleAsyncTaskExecutor()).get();
    }


    public static void main(String[] args) {
        System.out.println("************************WORKER   CONFIGURATION    ****************");
        SpringApplication.run(WorkerConfiguration.class, args);
    }
}

属性文件

代码语言:javascript
复制
spring.main.allow-bean-definition-overriding=true
spring.kafka.bootstrap-servers=cdh5161-e2e-test-1.eaas.amdocs.com:9092,cdh5161-e2e-test-2.eaas.amdocs.com:9092,cdh5161-e2e-test-3.eaas.amdocs.com:9092,cdh5161-e2e-test-4.eaas.amdocs.com:9092,cdh5161-e2e-test-5.eaas.amdocs.com:9092,cdh5161-e2e-test-6.eaas.amdocs.com:9092,cdh5161-e2e-test-7.eaas.amdocs.com:9092,cdh5161-e2e-test-8.eaas.amdocs.com:9092
spring.kafka.consumer.group-id=remotePartitioningConsuerGroup

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.properties.spring.json.add.type.headers=false 

spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.properties.spring.json.trusted.packages=*
server.port=8050

## PostgreSQL
spring.datasource.url=jdbc:postgresql://localhost:5432/bdauser
spring.datasource.username=bdauser
spring.datasource.password=bdauser

#drop n create table again, good for testing, comment this in production
spring.jpa.hibernate.ddl-auto=create
EN

回答 1

Stack Overflow用户

发布于 2021-09-16 11:13:41

因为你使用的是json.StepExecution类和JobExecution类有双向的关系。因此,在尝试序列化要发送回应答通道的StepExecution对象时,它会导致无限递归。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64485624

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档