java后端系统架构之消息队列篇:kafka的实验
分布式队列kafka
·
1.kafka安装
tar -xzf kafka_2.10-0.8.2.1.tgz
cd kafka_2.10-0.8.2.1
2.启动
主机环境
top - 11:32:15 up 118 days, 8:18, 4 users, load average: 1.97, 3.45, 4.45
Tasks: 152 total, 1 running, 151 sleeping, 0 stopped, 0 zombie
Cpu0 : 15.0%us, 13.0%sy, 0.0%ni, 68.0%id, 3.0%wa, 0.0%hi, 1.0%si, 0.0%st
Cpu1 : 26.2%us, 15.0%sy, 0.0%ni, 56.8%id, 0.0%wa, 0.0%hi, 2.0%si, 0.0%st
Cpu2 : 62.7%us, 13.7%sy, 0.0%ni, 18.3%id, 0.0%wa, 0.0%hi, 5.3%si, 0.0%st
Cpu3 : 16.6%us, 14.0%sy, 0.0%ni, 66.1%id, 2.7%wa, 0.0%hi, 0.7%si, 0.0%st
Tasks: 152 total, 1 running, 151 sleeping, 0 stopped, 0 zombie
Cpu0 : 15.0%us, 13.0%sy, 0.0%ni, 68.0%id, 3.0%wa, 0.0%hi, 1.0%si, 0.0%st
Cpu1 : 26.2%us, 15.0%sy, 0.0%ni, 56.8%id, 0.0%wa, 0.0%hi, 2.0%si, 0.0%st
Cpu2 : 62.7%us, 13.7%sy, 0.0%ni, 18.3%id, 0.0%wa, 0.0%hi, 5.3%si, 0.0%st
Cpu3 : 16.6%us, 14.0%sy, 0.0%ni, 66.1%id, 2.7%wa, 0.0%hi, 0.7%si, 0.0%st
启动kafka zookeeper进程
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动leader
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server.properties &
启动follower 1
bin/kafka-server-start.sh config/server-1.properties &
启动follower 2
bin/kafka-server-start.sh config/server-2.properties &
3.实验准备
1.创建kafka主题队列bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic page_visits1
2.查看Kafka主题队列是否加入数据
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic page_visits1 --from-beginning
4.常见异常
解决FailedToSendMessageException问题。
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
修改config/server.properties中 host.name=主机地址
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
修改config/server.properties中 host.name=主机地址
5.写入主题消息
TestProducer
package kafka.client;
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static void main(String[] args) {
long events = 1000000;
Random rnd = new Random();
Properties props = new Properties();
//props.put("zk.connect", "192.168.161.73:2181");
props.put("metadata.broker.list", "192.168.161.73:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
// props.put("zookeeper.session.timeout.ms", "400000");
props.put("partitioner.class", "kafka.client.SimplePartitioner");
//props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
long begin= System.currentTimeMillis();
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits1", runtime+"",ip, msg);
producer.send(data);
}
long end= System.currentTimeMillis();
System.out.println("cost time:"+(end-begin));
producer.close();
}
}
6.消费主题消息
SimplePartitioner 可选
package kafka.client;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
ConsumerGroupExample
package kafka.client;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown, exiting uncleanly");
}
}
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = "192.168.161.73:2181";
String groupId = "testgroup";
String topic = "page_visits1";
int threads = Integer.parseInt("10");
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
example.shutdown();
}
}
ConsumerTest
package kafka.client;
import java.util.concurrent.Callable;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerTest implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
long begin= System.currentTimeMillis();
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
System.out.println( "stream.iterator cost time="+(begin-begin));
// int s= m_stream.size();
long begin1= System.currentTimeMillis();
int s=0;
String val="";
while (it.hasNext()){
val=String.valueOf(it.next().message());
if(s%10000==0) {
System.out.println("Thread " + m_threadNumber + ": "+val );}
s++;
}
System.out.println("Shutting down Thread: " + m_threadNumber);
long end= System.currentTimeMillis();
System.out.println("thread "+m_threadNumber+ " cost time="+(end-begin)+",queue counter="+s);
}
}
7.测试
1.运行TestProducer,向page_visits1主题发布消息。
第一次运行
./run-producter.sh
[INFO] starting ...
SERVER_HOME=/root/kafka-test
-Xms2048M -Xmx2048M -XX:MaxPermSize=1024M -XX:PermSize=512M -XX:+UseG1GC
BOOTCMD=java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.TestProducer
java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.TestProducer
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
cost time:150936 。即150秒.
[INFO] starting ...
SERVER_HOME=/root/kafka-test
-Xms2048M -Xmx2048M -XX:MaxPermSize=1024M -XX:PermSize=512M -XX:+UseG1GC
BOOTCMD=java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.TestProducer
java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.TestProducer
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
cost time:150936 。即150秒.
第二次运行:
./run-producter.sh
[INFO] starting ...
SERVER_HOME=/root/kafka-test
-Xms2048M -Xmx2048M -XX:MaxPermSize=1024M -XX:PermSize=512M -XX:+UseG1GC
BOOTCMD=java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.TestProducer
java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.TestProducer
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
cost time:149937。即149秒,
[INFO] starting ...
SERVER_HOME=/root/kafka-test
-Xms2048M -Xmx2048M -XX:MaxPermSize=1024M -XX:PermSize=512M -XX:+UseG1GC
BOOTCMD=java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.TestProducer
java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.TestProducer
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
cost time:149937。即149秒,
2.运行ConsumerGroupExample,消息page_visits1主题消息
第一次运行:
./run-consumer.sh
[INFO] starting ...
SERVER_HOME=/root/kafka-test
-Xms2048M -Xmx2048M -XX:MaxPermSize=1024M -XX:PermSize=512M -XX:+UseG1GC
BOOTCMD=java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.ConsumerGroupExample
java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.ConsumerGroupExample
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Shutting down Thread: 9
thread 9 cost time=10029,queue counter=98576
Shutting down Thread: 4
thread 4 cost time=10045,queue counter=97970
Shutting down Thread: 0
thread 0 cost time=10048,queue counter=101707
Shutting down Thread: 8
thread 8 cost time=10030,queue counter=101870
Shutting down Thread: 1
thread 1 cost time=10048,queue counter=97799
Shutting down Thread: 7
thread 7 cost time=10038,queue counter=97865
Shutting down Thread: 5
Shutting down Thread: 6
thread 5 cost time=10038,queue counter=102579
Shutting down Thread: 2
thread 2 cost time=10047,queue counter=102181
thread 6 cost time=10045,queue counter=97463
Shutting down Thread: 3
thread 3 cost time=10047,queue counter=101990
[INFO] starting ...
SERVER_HOME=/root/kafka-test
-Xms2048M -Xmx2048M -XX:MaxPermSize=1024M -XX:PermSize=512M -XX:+UseG1GC
BOOTCMD=java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.ConsumerGroupExample
java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.ConsumerGroupExample
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Shutting down Thread: 9
thread 9 cost time=10029,queue counter=98576
Shutting down Thread: 4
thread 4 cost time=10045,queue counter=97970
Shutting down Thread: 0
thread 0 cost time=10048,queue counter=101707
Shutting down Thread: 8
thread 8 cost time=10030,queue counter=101870
Shutting down Thread: 1
thread 1 cost time=10048,queue counter=97799
Shutting down Thread: 7
thread 7 cost time=10038,queue counter=97865
Shutting down Thread: 5
Shutting down Thread: 6
thread 5 cost time=10038,queue counter=102579
Shutting down Thread: 2
thread 2 cost time=10047,queue counter=102181
thread 6 cost time=10045,queue counter=97463
Shutting down Thread: 3
thread 3 cost time=10047,queue counter=101990
top - 11:36:04 up 118 days, 8:22, 4 users, load average: 0.80, 2.21, 3.76
Tasks: 151 total, 1 running, 150 sleeping, 0 stopped, 0 zombie
Cpu0 : 13.3%us, 46.7%sy, 0.0%ni, 40.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 16.2%us, 36.8%sy, 0.0%ni, 47.1%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu2 : 17.3%us, 37.3%sy, 0.0%ni, 44.0%id, 0.0%wa, 0.0%hi, 1.3%si, 0.0%st
Cpu3 : 15.3%us, 37.5%sy, 0.0%ni, 44.4%id, 0.0%wa, 0.0%hi, 2.8%si, 0.0%st
Tasks: 151 total, 1 running, 150 sleeping, 0 stopped, 0 zombie
Cpu0 : 13.3%us, 46.7%sy, 0.0%ni, 40.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 16.2%us, 36.8%sy, 0.0%ni, 47.1%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu2 : 17.3%us, 37.3%sy, 0.0%ni, 44.0%id, 0.0%wa, 0.0%hi, 1.3%si, 0.0%st
Cpu3 : 15.3%us, 37.5%sy, 0.0%ni, 44.4%id, 0.0%wa, 0.0%hi, 2.8%si, 0.0%st
完成所有消息消费,约100秒钟
第二次运行:
./run-consumer.sh
[INFO] starting ...
SERVER_HOME=/root/kafka-test
-Xms2048M -Xmx2048M -XX:MaxPermSize=1024M -XX:PermSize=512M -XX:+UseG1GC
BOOTCMD=java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.ConsumerGroupExample
java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.ConsumerGroupExample
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Shutting down Thread: 9
Shutting down Thread: 1
thread 9 cost time=10026,queue counter=0
Shutting down Thread: 4
Shutting down Thread: 0
Shutting down Thread: 8
thread 0 cost time=10037,queue counter=0
thread 4 cost time=10028,queue counter=0
Shutting down Thread: 7
thread 7 cost time=10027,queue counter=0
thread 1 cost time=10027,queue counter=0
thread 8 cost time=10027,queue counter=0
Shutting down Thread: 2
thread 2 cost time=10034,queue counter=0
Shutting down Thread: 6
thread 6 cost time=10028,queue counter=0
Shutting down Thread: 5
thread 5 cost time=10028,queue counter=0
Shutting down Thread: 3
thread 3 cost time=10034,queue counter=0
[INFO] starting ...
SERVER_HOME=/root/kafka-test
-Xms2048M -Xmx2048M -XX:MaxPermSize=1024M -XX:PermSize=512M -XX:+UseG1GC
BOOTCMD=java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.ConsumerGroupExample
java -cp /root/kafka-test/conf:/root/kafka-test/lib:/root/kafka-test/lib/* kafka.client.ConsumerGroupExample
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Shutting down Thread: 9
Shutting down Thread: 1
thread 9 cost time=10026,queue counter=0
Shutting down Thread: 4
Shutting down Thread: 0
Shutting down Thread: 8
thread 0 cost time=10037,queue counter=0
thread 4 cost time=10028,queue counter=0
Shutting down Thread: 7
thread 7 cost time=10027,queue counter=0
thread 1 cost time=10027,queue counter=0
thread 8 cost time=10027,queue counter=0
Shutting down Thread: 2
thread 2 cost time=10034,queue counter=0
Shutting down Thread: 6
thread 6 cost time=10028,queue counter=0
Shutting down Thread: 5
thread 5 cost time=10028,queue counter=0
Shutting down Thread: 3
thread 3 cost time=10034,queue counter=0
无任何消息消费。居然发现cost time与第一次相同。说明每个线程耗用10s,是线程run从开始到线程结束,所需要的时间。
实验代码地址:
https://code.csdn.net/wh0426/kafkademo/tree/master
8.结论
1.Kafka队列消息插入速度非常快,发布100W请求,耗时150秒。
2.Kafka队列消息消费也非常快,消费100W请求,耗时100秒
3.kfaka队列支持Partition分区功能,默认按key进行hash分区,自定义扩展实现
Partitioner接口,并指定partitioner.class属性配置为自定义扩展类。
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic page_visits1
Topic:page_visits1
PartitionCount:10
ReplicationFactor:1
Configs:
Topic: page_visits1 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: page_visits1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: page_visits1 Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: page_visits1 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: page_visits1 Partition: 4 Leader: 2 Replicas: 2 Isr: 2
Topic: page_visits1 Partition: 5 Leader: 0 Replicas: 0 Isr: 0
Topic: page_visits1 Partition: 6 Leader: 2 Replicas: 2 Isr: 2
Topic: page_visits1 Partition: 7 Leader: 0 Replicas: 0 Isr: 0
Topic: page_visits1 Partition: 8 Leader: 2 Replicas: 2 Isr: 2
Topic: page_visits1 Partition: 9 Leader: 0 Replicas: 0 Isr: 0
4.后续需要测试单点故障,当一台leader kafka 失败后,剩余的follower是否可以继续接管任务。
更多推荐
已为社区贡献6条内容
所有评论(0)