SparkStreaming Fails to Consume Kafka Messages, and Message "Couldn't find leader offsets" Is Displayed
Symptom
When SparkStreaming is used to consume messages of a specified topic in Kafka, data cannot be obtained from Kafka.
The following error message is displayed: Couldn't find leader offsets.
Possible Causes
- The Kafka service is abnormal.
- The network is abnormal.
- The Kafka topic is abnormal.
Cause Analysis
- On Manager, check the status of the Kafka cluster. The status is Good, and the monitoring metrics are correctly displayed.
- View the error topic information in the SparkStreaming log.
Run the Kafka commands to obtain the topic assignment information and copy synchronization information, and check the return result.
kafka-topics.sh --describe --zookeeper <zk_host:port/chroot> --topic <topic name>
If information in the following figure is displayed, the topic is normal. All partitions have normal leader information.
Figure 1 Topic distribution information and copy synchronization information
- Check whether the network connection between the client and Kafka cluster is normal. If no, contact the network team to rectify the fault.
- Log in to Kafka Broker using SSH.
Run the cd /var/log/Bigdata/kafka/broker command to go to the log directory.
Check on server.log indicates that the error message is displayed in the log shown in the following figure.
2018-05-30 12:02:00,246 | ERROR | [kafka-network-thread-6-PLAINTEXT-3] | Processor got uncaught exception. | kafka.network.Processor (Logging.scala:103)java.lang.OutOfMemoryError: Direct buffer memoryat java.nio.Bits.reserveMemory(Bits.java:694)at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)at sun.nio.ch.IOUtil.read(IOUtil.java:195)at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
- On Manager, check the configuration of the current Kafka cluster.
- MRS Manager: Log in to MRS Manager and choose Services > Kafka > Service Configuration. Set Type to All. The value of -XX:MaxDirectMemorySize in KAFKA_JVM_PERFORMANCE_OPTS is 1G.
- FusionInsight Manager: Log in to FusionInsight Manager. Choose Cluster > Services > Kafka > Configurations > All Configurations. The value of -XX:MaxDirectMemorySize in KAFKA_JVM_PERFORMANCE_OPTS is 1G.
- If the direct memory is too small, an error is reported. Once the direct memory overflows, the node cannot process new requests. As a result, other nodes or clients fail to access the node due to timeout.
Solution
- Log in to Manager and go to the Kafka configuration page.
- Set Type to All, and search for and change the value of KAFKA_JVM_PERFORMANCE_OPTS.
- Save the configuration and restart the service or instance whose configuration has expired.
- Symptom
- Possible Causes
- Cause Analysis
- Solution