介绍
http://kafka.apache.org
kafka是一种高吞吐量的分布式发布订阅消息系统
kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)
当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。
高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理
测试环境
kafka_2.10-0.8.1.1 3个节点做的集群
zookeeper-3.4.5 一个实例节点
代码示例
消息生产者代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/** * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
* @author Fung
*
*/
public class ProducerDemo {
public static void main(String[] args) {
Random rnd = new Random();
int events= 100 ;
// 设置配置属性
Properties props = new Properties();
props.put( "metadata.broker.list" , "172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092" );
props.put( "serializer.class" , "kafka.serializer.StringEncoder" );
// key.serializer.class默认为serializer.class
props.put( "key.serializer.class" , "kafka.serializer.StringEncoder" );
// 可选配置,如果不配置,则使用默认的partitioner
props.put( "partitioner.class" , "com.catt.kafka.demo.PartitionerDemo" );
// 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
// 值为0,1,-1,可以参考
// http://kafka.apache.org/08/configuration.html
props.put( "request.required.acks" , "1" );
ProducerConfig config = new ProducerConfig(props);
// 创建producer
Producer<String, String> producer = new Producer<String, String>(config);
// 产生并发送消息
long start=System.currentTimeMillis();
for ( long i = 0 ; i < events; i++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + i; //rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
//如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0
KeyedMessage<String, String> data = new KeyedMessage<String, String>(
"page_visits" , ip, msg);
producer.send(data);
}
System.out.println( "耗时:" + (System.currentTimeMillis() - start));
// 关闭producer
producer.close();
}
} |
消息消费者代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/** * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
*
* @author Fung
*
*/
public class ConsumerDemo {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {
consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
this .topic = a_topic;
}
public void shutdown() {
if (consumer != null )
consumer.shutdown();
if (executor != null )
executor.shutdown();
}
public void run( int numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(numThreads));
Map<String, List<KafkaStream< byte [], byte []>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
List<KafkaStream< byte [], byte []>> streams = consumerMap.get(topic);
// now launch all the threads
executor = Executors.newFixedThreadPool(numThreads);
// now create an object to consume the messages
//
int threadNumber = 0 ;
for ( final KafkaStream stream : streams) {
executor.submit( new ConsumerMsgTask(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {
Properties props = new Properties();
props.put( "zookeeper.connect" , a_zookeeper);
props.put( "group.id" , a_groupId);
props.put( "zookeeper.session.timeout.ms" , "400" );
props.put( "zookeeper.sync.time.ms" , "200" );
props.put( "auto.commit.interval.ms" , "1000" );
return new ConsumerConfig(props);
}
public static void main(String[] arg) {
String[] args = { "172.168.63.221:2188" , "group-1" , "page_visits" , "12" };
String zooKeeper = args[ 0 ];
String groupId = args[ 1 ];
String topic = args[ 2 ];
int threads = Integer.parseInt(args[ 3 ]);
ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);
demo.run(threads);
try {
Thread.sleep( 10000 );
} catch (InterruptedException ie) {
}
demo.shutdown();
}
} |
消息处理类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerMsgTask implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
m_threadNumber = threadNumber;
m_stream = stream;
}
public void run() {
ConsumerIterator< byte [], byte []> it = m_stream.iterator();
while (it.hasNext())
System.out.println( "Thread " + m_threadNumber + ": "
+ new String(it.next().message()));
System.out.println( "Shutting down Thread: " + m_threadNumber);
}
} |
Partitioner类示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class PartitionerDemo implements Partitioner {
public PartitionerDemo(VerifiableProperties props) {
}
@Override
public int partition(Object obj, int numPartitions) {
int partition = 0 ;
if (obj instanceof String) {
String key=(String)obj;
int offset = key.lastIndexOf( '.' );
if (offset > 0 ) {
partition = Integer.parseInt(key.substring(offset + 1 )) % numPartitions;
}
} else {
partition = obj.toString().length() % numPartitions;
}
return partition;
}
} |
pom.xml文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
<project xmlns= "http://maven.apache.org/POM/4.0.0" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
<modelVersion> 4.0 . 0 </modelVersion>
<groupId>com.xxx</groupId>
<artifactId>kafka-demo</artifactId>
<version> 0.0 . 1 -SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-demo</name>
<url>http: //maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF- 8 </project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2. 10 </artifactId>
<version> 0.8 . 1.1 </version>
<exclusions>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version> 1.2 . 15 </version>
<exclusions>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>mail</artifactId>
<groupId>javax.mail</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version> 4.11 </version>
<scope>test</scope>
</dependency>
</dependencies>
</project> |
相关推荐
kafka-java-demo 基于java的kafka生产消费者示例。 mvn
本篇文章主要介绍了Kafka使用Java客户端进行访问的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
通过对Kafka生产者和消费者的代码示例分析,文档展示了消息的发送和接收过程,同时探讨了Kafka集群的搭建和运维,包括主题创建、分区管理、消息复制和容错处理等核心功能。此外,本文还探讨了Kafka的Java客户端访问...
longlang / phpkafka 介绍 English | PHP Kafka客户端用于PHP-FPM和Swoole。 通信协议基于Java中的JSON文件。 PHP Kafka客户端支持50个API,这可能是有史以来支持最多消息类型的API。...请参考examples代码示例。
:示例Kafka客户端应用程序,例如Java生产者应用程序(用于将数据写入Kafka)和Java Consumer应用程序(用于从Kafka读取数据) :这些示例演示了将数据获取到Kafka主题,加载到Kafka Streams API KStream各种方法...
启动动物园管理员 ./bin/zookeeper-server-start.sh config/zookeeper.properties ... ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic example
Apache Kafka基础知识和在JAVA中实现的迷你项目(Twitter Sreaming_Producer和Consumer配置)的工作示例。 该项目是根据而开发的。 我强烈建议他在Udemy上他的课程,以帮助您了解Apache Kafka生态系统,体系结构,...
它不是在Java客户端上建模的。 所有API调用都是同步的,并且所有代码都在调用goroutine中执行。 软件包libkafka是一个低级库,用于从Kafka 2.3+生成和使用。 它没有外部依赖性。 它不是在Java客户端上建模的。 所有...
春天卡夫卡使用Spring Kafka项目(带有Spring Boot的基于Maven的项目结构)发送或接收带有Kafka主题的字符串或JSON消息的示例Kafka客户端代码段Spring数据用于使用标准Spring JPA和Spring Data JPA(不带Spring Boot...
在此存储库中,有一系列使用Spark,Hive / Impala和Kafka处理收集的数据的示例: : 。特征单一标签站点集成:包括Divolte Collector是HTML的单一代码。 只需在文档正文末尾加载JavaScript。 专为Hadoop和Kafka构建...
通过使用监视组件,可以抽象出大多数用于监视微服务的粘合代码。它使用Micrometer , Prometheus , Grafana和Spring-Boot-Actuator来实现对MicroServices的全面监视,这是从此框架扩展而来的。 samplemicrosvc-一...