Apache Kafka是一个分布式流(Streaming)处理平台,Kafka最高支持每秒百万级的消息吞吐量。
Kafka使用“java/Scala”开发,支持集群、支持负载均衡、支持动态扩容(基于Zk),不支持事务。
Kafka与RabbitMQ
- Kafka作为消息传输的数据管道,RabbitMQ作为交易数据的传输管道,Kafka存在有数据丢失的风险。
- RabbitMQ在金融场景经常使用,具有较高的严谨性,数据丢失的可能性跟小,同时具备更高的实时性
- Kafka优势主要体现在吞吐量上,虽然可以通过策略实现数据不丢失,但是依旧不如RabbitMQ严谨
- Kafka需要保证每条消息最少送达一次,有较小的概率出现重复消费的情况
kafka安装
可自己下载后上传到服务器上,也可以通“wget”命令下载
wget https://downloads.apache.org/kafka/2.8.0/kafka_2.12-2.8.0.tgz
解压并重命名文件
tar -xzvf /srv/ftp/kafka_2.13-2.8.0.tgz -C /usr/local/
mv /usr/local/kafka_2.13-2.8.0.tgz/ /usr/local/kafka
创建数据目录,如果要进行Kafka的配置,那么一定要提供相应的数据目录:需要准备两个目录(zk,logs)
mkdir -p /usr/data/{zookeeper,kafka}
配置Zk,Kafka依赖于Zookeeper组件,但是不建议单独安装zookeeper,而是直接使用Kafka内部提供的Zookeeper组件进行所有信息的注册管理,编辑Zookeeper的配置文件
vim /usr/local/kafka/config/zookeeper.properties
修改:dataDir => dataDir=/usr/data/zookeeper
配置kafka
vim /usr/local/kafka/config/server.properties
修改:log.dirs => log.dirs=/usr/data/kafka/
设置:port => port=9095
设置: advertised.listeners => advertised.listeners=PLAINTEXT://192.168.180.130:9095
修改: zookeeper.connect => zookeeper.connect=localhost:2181(默认提供)
启动服务:kafka依赖于Zookeeper,所以必须先启动Zookeeper
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
创建主题:Kafka内部提供有主题的直接创建处理操作,创建一个新的主题
创建一个新的主题:/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
查看kafka中所有的主题信息(主题信息保存在ZK中):/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
消息发送:在kafka内部未来方便用户使用消息处理,所有直接提供有消息的收发命令
producer生产者:/usr/local/kafka/bin/kafka-console-producer.sh --topic test-topic --bootstrap-server 192.168.180.130:9095
consumer消费者:/usr/local/kafka/bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server 192.168.180.130:9095 --from-beginning
“--from-beginning”:表示从头开始消费
使用Java调用kafka
要使用java调用kafka,需要在项目之中引入“kafka-clients”依赖库(要与使用的kafka版本对应),kafka中使用log4j的日志管理机制,所以还需要引入log4j的依赖库
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.14.1</version>
</dependency>
创建生产者(producer)
public class SendMessage {
/** kafka服务器地址端口*/
private static final String SERVERS = "192.168.180.130:9095";
/** 要进行操作的主题*/
private static final String TOPIC = "happy-topic";
public static void main(String[] args) {
// 需要设置一系列的环境属性
Properties props = new Properties();
// 设置操作主机
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
// 在Kafka里需要明确的设置要传输的数据类型,而且数据类型要分为“key”和“value两个内容”
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 客户端ID
props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "sxl.Send");
// 设置消息的发送者,但是需要明确的指派消息的类型,此类型与配置类型完全相同
Producer<Integer, String> producer = new KafkaProducer<Integer, String>(props);
for (int x = 0; x < 1000; x++) {
producer.send(new ProducerRecord<Integer, String>(TOPIC, x, "kafka数据发送 - " + x));
}
producer.close();
}
}
创建消费者(consumer)
public class ConsumerMessage {
/**
* kafka服务器地址端口
*/
private static final String SERVERS = "192.168.180.130:9095";
/*** 要进行操作的主题*/
private static final String TOPIC = "happy-topic";
public static void main(String[] args) {
Properties props = new Properties();
// 操作主机
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
// 在Kafka里需要明确的设置要传输的数据类型,而且数据类型要分为“key”和“value两个内容”
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 定义消费组
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-lee");
Consumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(props);
// 一个消费者可以处理多个主题的内容
consumer.subscribe(Arrays.asList(TOPIC));
boolean flag = true;
while (flag) {
ConsumerRecords<Integer, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
System.err.println("******************** 消费端获取kafka数据 ********************");
for (ConsumerRecord<Integer, String> consumerRecord : consumerRecords) {
System.out.println("【消费端】key = " + consumerRecord.key() + "、Value = " + consumerRecord.value() + "、offset = " + consumerRecord.offset());
}
}
}
}
消费端(consumer):可以设置消费端每次抓取的个数
// 定义最大抓取个数
props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");
消费端(consumer):在进行消费处理的时候,实际上包含有应答的处理机制,所谓的应答就是进行偏移量的配置,偏移量的配置需要在消费端中使用“ConsumerConfig.AUTO_OFFSET_RESET_CONFIG”环境属性定义,该环境属性有两个内容:
【earliest】从头消费:
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
【latest】不重复消费:
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
在进行从头消费处理的时候还有一项最为关键的配置“应答处理”,关闭自动应答
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
- 所谓的应答机制实际上就是告诉kafka不要再进行该消息的重复消费,可以简单的理解为修改offset偏移量,通过偏移量来保证所消费的全部都是最新的消息内容,但是这样的应答机制大部分环境中都建议使用自动的模式来完成,如果采用自动的模式,那么这里面有一项最为重要的配置选项,就是要设置应答机制的间隔,所有完整的配置为
// latest 不重复消费
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
// 开启应答应答
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
// 设置应答间隔的时间
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");
生产端(producer):在消费端使用的批量读取的模式完成的,所有生产者肯定不能一条一条的发送数据,应该考虑数据的批量发送,所有需要进行设置
【发送缓冲】props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"10240");
【发送间隔】props.setProperty(ProducerConfig.LINGER_MS_CONFIG,"100");
生产端(producer):如果仅仅只配置了上面上个,也存在一些问题,如果缓冲的内容过多,有可能造成发送的内容过大,所有还需要在配置一个最大请求数据的配置。使用了这个配置,要是达到了最大发送请求的数量,同时又没达到记录的缓冲个数或间隔发送时间,那么消息依然会进行发送处理
props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,"1024");
kafka分区处理
在进行数据处理的时候即使主题不存在,kafka也可以自动的进行创建,但是这样的处理机制只在较新的版本中才提供,老版本是没有提供的,在使用老版本的时候主题一定是要先创建的,随后才能在程序中使用。在新版本中也提供了自动删除的配置
分区模式
- 在所有的消费端上都提供有一个消费者的名称,这个组的名称可以由用户随意配置,但是如果此时不同的消费端消费同一个主题,并且处于不同的组里面,那么这种情况称为广播模式,所有的消费端都会重复进行消息内容的处理
- 所有的消费端上如果指定了相同的组名,则描述的是队列模式,即:若干个消费端将进行各自消息的处理,但是如果这个时候没有进行合理的区分设计,则只允许有指定个数的消费端来处理
删除主题:直接通过Kafka提供的命令进行主题的删除
/usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server kafka-server:9095 --topic happy-topic
创建主题:创建拥有三个分区的主题
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka-server:9095 --replication-factor 1 --partitions 3 --topic happy-topic
JAAS认证
在Kafka里面对于认证提供有两种实现形式:SSL、SASL(JAAS),SSL是一种基于证书的认证服务配置,如果要想使用SSL,则需要有一个公共的CA生成验证证书,随后再利用服务端证书搞到客户端的证书,随后再JDK里面进行依次的配置。这种方式最头疼的问题在于,每一的数据传输都必须进行加密和解密,同时为了解决数据窃听的问题,还需要提供有效验MAC码,这样的方式性能是非常差的,所以未来提升kafka处理性能,采用了SASL的方式来完成。这种方式最大的特点类是于m模拟登录的概念,即:客户端(生产者与消费者)只需要提供认证用户就能进行访问了
如果要想使用JAAS处理操作,则必须依次进行Zookeeper配置、kafkaServer配置、kafkaClient配置。
1、在kafka目录之中创建一个jaas的,这个目录保存有相应的配置项:
mkdir -p /usr/local/kafka/jaas
2、创建zk专属的配置文件
vim /usr/local/kafka/jaas/kafka_zookeeper_jaas.conf
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="zkadmin"
password="zkadmin-pwd";
};
3、配置jaas:此时的Zookeeper还不认识JAAS这个文件,所以要修改zookeeper启动文件:
vim /usr/local/kafka/bin/zookeeper-server-start.sh
在文件的顶部追加:export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/jaas/kafka_zookeeper_jaas.conf"
4、启动zk】:此时启动zookeeper就存在有JAAS认证处理
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
5、kafka认证文件:创建kafka认证文件
vim /usr/local/kafka/jaas/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="zkadmin"
password="zkadmin-pwd"
user_zkadmin="zkadmin-pwd"
user_alice="alice-pwd"
user_bob="bob-pwd";
};
明确描述要跟zk连接:(username="zkadmin" password="zkadmin-pwd"):是配置的zk的账户信息 user_zkadmin="zkadmin-pwd"
后面的才是用户信息:user_alice(用户名)="alice-pwd"(密码)user_bob(用户名)="bob-pwd";(密码)
6、kafka整合JAAS:此时的JAAS文件kafka并不认可,所有还需要修改kafka启动服务文件
vim /usr/local/kafka/bin/kafka-server-start.sh
在文件的顶部追加:export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/jaas/kafka_server_jaas.conf"
7、kafka配置文件:虽然以及进行JAAS定义,但是对于当前的Kafa还未启用SASL认证,所有修改Kafka服务端配置文件。没有配置项添加即可
vim /usr/local/kafka/config/server.properties
port=9095
listeners=SASL_PLAINTEXT://kafka-server:9095
advertised.listeners=SASL_PLAINTEXT://kafka-server:9095
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
在进行启动的时候最好清空这两文件夹
rm -r /usr/data/zookeeper/*
rm -r /usr/data/kafka/*
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
8、程序配置:此时如果要想在程序中访问配置了JAAS认证的kafka服务器,那么最好的做法就是直接进行JAAS安全文件的配置,在磁盘上新建一个文件,保存配置
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="bob"
password="bob-pwd";
};
10、生产者:在程序进行安全访问
static {
// 表示系统环境属性
System.setProperty("java.security.auth.login.config","F:/kafka_client_jaas.conf");
}
props.setProperty(SaslConfigs.SASL_MECHANISM,"PLAIN");
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
11、消费者与生产者配置一样,完成之后就可以进行安全的访问了
spring操作kafka
在项目中引入kafka依赖,要与版本对应
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.0</version>
</dependency>
创建kafka-consumer模块
consumer:要进行kafka资源的连接,肯定需要定义一些列的属性文件,定义“resources\kafka\kafka.properties”
# 表示启动SASL认证
kafka.sasl.mechanism=PLAIN
# 表示启动SASL认证
kafka.security.protocol=SASL_PLAINTEXT
# 设置kafka主机端口
kafka.bootstrap.servers=kafka-cluster-a:9095,kafka-cluster-b:9095,kafka-cluster-c:9095
# 设置key的反序列化
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 设置value的反序列化
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 组
kafka.group.id=group-1
# 主题
kafka.topic=rsthe-hello
# 是否自动提交
kafka.auto.commit=true
# 最大的抓取量
kafka.max.poll.records=10
consumer:kafka存在有JAAS安全认证,所以将JAAS文件也要保存在该目录之中“resources\kafka\kafka_client_jaas.conf”
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="bob"
password="bob-pwd";
};
consumer:要进行kafka的整合还需要定义相关的Spring配置文件“spring-base.xml”,在这个文件中进行一个配置类的加载
<context:component-scan base-package="cn.rsthe.kafka.consumer"/>
consumer:创建一个Kafka的监听程序类(spring整合特色)
public class KafkaMessageListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("【消费端】key = " + record.key() + "、value = " + record.value());
}
}
consumer:创建kafka配置文件类
package cn.rsthe.kafka.consumer.config;
import cn.rsthe.kafka.consumer.listener.KafkaMessageListener;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class KafkaConsumerConfig {
@Bean("containerProperties")
public ContainerProperties getContainerProperties(
@Value("${kafka.topic}") String topic
) {
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(new KafkaMessageListener());
return containerProperties;
}
@Bean("consumerFactory")
public DefaultKafkaConsumerFactory getConsumerFactory(
// 认证处理
@Value("${kafka.sasl.mechanism}") String saslMechanism,
// 认证处理
@Value("${kafka.security.protocol}") String securityProtocol,
// 连接信息
@Value("${kafka.bootstrap.servers}") String bootstrapServers,
// key反序列化
@Value("${kafka.key.deserializer}") String keyDeserializer,
// value反序列化
@Value("${kafka.value.deserializer}") String valueDeserializer,
// 组
@Value("${kafka.group.id}") String groupId,
// 是否自动提交
@Value("${kafka.auto.commit}") boolean autoCommit,
// 最大抓取数
@Value("${kafka.max.poll.records}") int maxPoll
) {
Map<String, Object> map = new HashMap<>();
map.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
map.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
map.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,autoCommit);
map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPoll);
DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory(map);
return factory;
}
@Bean("messageListenerContainer")
public KafkaMessageListenerContainer getKafkaMessageListenerContainer(
@Autowired DefaultKafkaConsumerFactory consumerFactory,
@Autowired ContainerProperties containerProperties
){
KafkaMessageListenerContainer messageListenerContainer = new KafkaMessageListenerContainer(consumerFactory,containerProperties);
return messageListenerContainer;
}
@Bean("jaasResource")
public Resource getJaasResource(
@Value("${classpath:kafka/kafka_client_jaas.conf}") Resource jaasPath
) throws IOException {
// 表示系统环境属性
System.setProperty("java.security.auth.login.config",
jaasPath.getFile().getPath());
return jaasPath;
}
}
定义producer:生产端直接使用consumer复制
producer:创建kafka配置文件类
package cn.rsthe.kafka.producer.config;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.io.Resource;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Configuration
@PropertySource("classpath:kafka/kafka.properties")
public class KafkaProducerConfig {
@Bean("producerFactory")
public DefaultKafkaProducerFactory getProducerFactory(
// 认证处理
@Value("${kafka.sasl.mechanism}") String saslMechanism,
// 认证处理
@Value("${kafka.security.protocol}") String securityProtocol,
// 连接信息
@Value("${kafka.bootstrap.servers}") String bootstrapServers,
// key反序列化
@Value("${kafka.key.serializer}") String keyeserializer,
// value反序列化
@Value("${kafka.value.serializer}") String valueeserializer
) {
Map<String, Object> map = new HashMap<>();
map.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
map.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keyeserializer);
map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueeserializer);
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(map);
return factory;
}
@Bean("kafkaTemplate")
public KafkaTemplate getKafkaTemplate(
@Value("${kafka.topic}") String topic,
@Autowired DefaultKafkaProducerFactory producerFactory
){
KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory);
kafkaTemplate.setDefaultTopic(topic);
return kafkaTemplate;
}
@Bean("jaasResource")
public Resource getJaasResource(
@Value("${classpath:kafka/kafka_client_jaas.conf}") Resource jaasPath
) throws IOException {
// 表示系统环境属性
System.setProperty("java.security.auth.login.config",
jaasPath.getFile().getPath());
return jaasPath;
}
}
producer:编写业务层接口和实现子类
接口
package cn.rsthe.kafka.producer.service;
public interface IMessageService {
public void sendMessage(String value);
}
实现子类
package cn.rsthe.kafka.producer.service.impl;
import cn.rsthe.kafka.producer.service.IMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class MessageServiceImpl implements IMessageService {
@Autowired
KafkaTemplate<String,String> kafkaTemplate;
@Override
public void sendMessage(String value) {
this.kafkaTemplate.sendDefault("hello-key",value);
}
}
编写生产端、消费端测试类
消费端
@ContextConfiguration(locations = {"classpath:spring/spring-base.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class test {
@Test
public void tetsConsumer() throws Exception{
Thread.sleep(Long.MAX_VALUE);
}
}
生产端
@ContextConfiguration(locations = {"classpath:spring/spring-base.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class test {
@Autowired
private IMessageService messageService;
@Test
public void testProducer() throws Exception{
this.messageService.sendMessage("你好kafka");
}
}