Kafka

Kafka

小龙 323 2020-02-24

Apache Kafka是一个分布式流(Streaming)处理平台,Kafka最高支持每秒百万级的消息吞吐量。
Kafka使用“java/Scala”开发,支持集群、支持负载均衡、支持动态扩容(基于Zk),不支持事务。

Kafka与RabbitMQ

  • Kafka作为消息传输的数据管道,RabbitMQ作为交易数据的传输管道,Kafka存在有数据丢失的风险。
  • RabbitMQ在金融场景经常使用,具有较高的严谨性,数据丢失的可能性跟小,同时具备更高的实时性
  • Kafka优势主要体现在吞吐量上,虽然可以通过策略实现数据不丢失,但是依旧不如RabbitMQ严谨
  • Kafka需要保证每条消息最少送达一次,有较小的概率出现重复消费的情况

kafka安装

可自己下载后上传到服务器上,也可以通“wget”命令下载

wget https://downloads.apache.org/kafka/2.8.0/kafka_2.12-2.8.0.tgz

解压并重命名文件

tar -xzvf /srv/ftp/kafka_2.13-2.8.0.tgz -C /usr/local/
mv /usr/local/kafka_2.13-2.8.0.tgz/ /usr/local/kafka

创建数据目录,如果要进行Kafka的配置,那么一定要提供相应的数据目录:需要准备两个目录(zk,logs)

mkdir -p /usr/data/{zookeeper,kafka}

配置Zk,Kafka依赖于Zookeeper组件,但是不建议单独安装zookeeper,而是直接使用Kafka内部提供的Zookeeper组件进行所有信息的注册管理,编辑Zookeeper的配置文件

 vim /usr/local/kafka/config/zookeeper.properties
修改:dataDir => dataDir=/usr/data/zookeeper

配置kafka

 vim /usr/local/kafka/config/server.properties
修改:log.dirs  => log.dirs=/usr/data/kafka/
设置:port  => port=9095
设置: advertised.listeners => advertised.listeners=PLAINTEXT://192.168.180.130:9095
修改: zookeeper.connect => zookeeper.connect=localhost:2181(默认提供)

启动服务:kafka依赖于Zookeeper,所以必须先启动Zookeeper

/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties 

创建主题:Kafka内部提供有主题的直接创建处理操作,创建一个新的主题

创建一个新的主题:/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
查看kafka中所有的主题信息(主题信息保存在ZK中):/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

消息发送:在kafka内部未来方便用户使用消息处理,所有直接提供有消息的收发命令

producer生产者:/usr/local/kafka/bin/kafka-console-producer.sh --topic test-topic --bootstrap-server 192.168.180.130:9095
consumer消费者:/usr/local/kafka/bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server 192.168.180.130:9095 --from-beginning
“--from-beginning”:表示从头开始消费

使用Java调用kafka

要使用java调用kafka,需要在项目之中引入“kafka-clients”依赖库(要与使用的kafka版本对应),kafka中使用log4j的日志管理机制,所以还需要引入log4j的依赖库

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.14.1</version>
</dependency>

创建生产者(producer)

public class SendMessage {
    /** kafka服务器地址端口*/
    private static final String SERVERS = "192.168.180.130:9095";
    /** 要进行操作的主题*/
    private static final String TOPIC = "happy-topic";
    public static void main(String[] args) {
        // 需要设置一系列的环境属性
        Properties props = new Properties();
        // 设置操作主机
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        // 在Kafka里需要明确的设置要传输的数据类型,而且数据类型要分为“key”和“value两个内容”
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 客户端ID
        props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "sxl.Send");
        // 设置消息的发送者,但是需要明确的指派消息的类型,此类型与配置类型完全相同
        Producer<Integer, String> producer = new KafkaProducer<Integer, String>(props);
        for (int x = 0; x < 1000; x++) {
            producer.send(new ProducerRecord<Integer, String>(TOPIC, x, "kafka数据发送 - " + x));
        }
        producer.close();
    }
}

创建消费者(consumer)

public class ConsumerMessage {
    /**
     * kafka服务器地址端口
     */
    private static final String SERVERS = "192.168.180.130:9095";
    /*** 要进行操作的主题*/
    private static final String TOPIC = "happy-topic";

    public static void main(String[] args) {
        Properties props = new Properties();
        // 操作主机
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        // 在Kafka里需要明确的设置要传输的数据类型,而且数据类型要分为“key”和“value两个内容”
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 定义消费组
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-lee");
        Consumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(props);
        // 一个消费者可以处理多个主题的内容
        consumer.subscribe(Arrays.asList(TOPIC));
        boolean flag = true;
        while (flag) {
            ConsumerRecords<Integer, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
            System.err.println("******************** 消费端获取kafka数据 ********************");
            for (ConsumerRecord<Integer, String> consumerRecord : consumerRecords) {
                System.out.println("【消费端】key = " + consumerRecord.key() + "、Value = " + consumerRecord.value() + "、offset = " + consumerRecord.offset());
            }
        }
    }
}

消费端(consumer):可以设置消费端每次抓取的个数

// 定义最大抓取个数
props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");

消费端(consumer):在进行消费处理的时候,实际上包含有应答的处理机制,所谓的应答就是进行偏移量的配置,偏移量的配置需要在消费端中使用“ConsumerConfig.AUTO_OFFSET_RESET_CONFIG”环境属性定义,该环境属性有两个内容:
【earliest】从头消费:

props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

【latest】不重复消费:

props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

在进行从头消费处理的时候还有一项最为关键的配置“应答处理”,关闭自动应答

props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
  • 所谓的应答机制实际上就是告诉kafka不要再进行该消息的重复消费,可以简单的理解为修改offset偏移量,通过偏移量来保证所消费的全部都是最新的消息内容,但是这样的应答机制大部分环境中都建议使用自动的模式来完成,如果采用自动的模式,那么这里面有一项最为重要的配置选项,就是要设置应答机制的间隔,所有完整的配置为
// latest 不重复消费  
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
// 开启应答应答  
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
// 设置应答间隔的时间
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");

生产端(producer):在消费端使用的批量读取的模式完成的,所有生产者肯定不能一条一条的发送数据,应该考虑数据的批量发送,所有需要进行设置

【发送缓冲】props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"10240");
【发送间隔】props.setProperty(ProducerConfig.LINGER_MS_CONFIG,"100");

生产端(producer):如果仅仅只配置了上面上个,也存在一些问题,如果缓冲的内容过多,有可能造成发送的内容过大,所有还需要在配置一个最大请求数据的配置。使用了这个配置,要是达到了最大发送请求的数量,同时又没达到记录的缓冲个数或间隔发送时间,那么消息依然会进行发送处理

props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,"1024");

kafka分区处理

在进行数据处理的时候即使主题不存在,kafka也可以自动的进行创建,但是这样的处理机制只在较新的版本中才提供,老版本是没有提供的,在使用老版本的时候主题一定是要先创建的,随后才能在程序中使用。在新版本中也提供了自动删除的配置

分区模式
  • 在所有的消费端上都提供有一个消费者的名称,这个组的名称可以由用户随意配置,但是如果此时不同的消费端消费同一个主题,并且处于不同的组里面,那么这种情况称为广播模式,所有的消费端都会重复进行消息内容的处理
  • 所有的消费端上如果指定了相同的组名,则描述的是队列模式,即:若干个消费端将进行各自消息的处理,但是如果这个时候没有进行合理的区分设计,则只允许有指定个数的消费端来处理

删除主题:直接通过Kafka提供的命令进行主题的删除

 /usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server kafka-server:9095 --topic happy-topic

创建主题:创建拥有三个分区的主题

/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka-server:9095 --replication-factor 1 --partitions 3 --topic happy-topic

JAAS认证

在Kafka里面对于认证提供有两种实现形式:SSL、SASL(JAAS),SSL是一种基于证书的认证服务配置,如果要想使用SSL,则需要有一个公共的CA生成验证证书,随后再利用服务端证书搞到客户端的证书,随后再JDK里面进行依次的配置。这种方式最头疼的问题在于,每一的数据传输都必须进行加密和解密,同时为了解决数据窃听的问题,还需要提供有效验MAC码,这样的方式性能是非常差的,所以未来提升kafka处理性能,采用了SASL的方式来完成。这种方式最大的特点类是于m模拟登录的概念,即:客户端(生产者与消费者)只需要提供认证用户就能进行访问了
如果要想使用JAAS处理操作,则必须依次进行Zookeeper配置、kafkaServer配置、kafkaClient配置。
1、在kafka目录之中创建一个jaas的,这个目录保存有相应的配置项:

mkdir -p /usr/local/kafka/jaas

2、创建zk专属的配置文件

vim /usr/local/kafka/jaas/kafka_zookeeper_jaas.conf
Server {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="zkadmin"
        password="zkadmin-pwd";
};

3、配置jaas:此时的Zookeeper还不认识JAAS这个文件,所以要修改zookeeper启动文件:

vim /usr/local/kafka/bin/zookeeper-server-start.sh
在文件的顶部追加:export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/jaas/kafka_zookeeper_jaas.conf"

4、启动zk】:此时启动zookeeper就存在有JAAS认证处理

/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties

5、kafka认证文件:创建kafka认证文件

vim /usr/local/kafka/jaas/kafka_server_jaas.conf
KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="zkadmin"
        password="zkadmin-pwd"
        user_zkadmin="zkadmin-pwd"
        user_alice="alice-pwd"
        user_bob="bob-pwd";
};
	明确描述要跟zk连接:(username="zkadmin" password="zkadmin-pwd"):是配置的zk的账户信息 user_zkadmin="zkadmin-pwd"
	后面的才是用户信息:user_alice(用户名)="alice-pwd"(密码)user_bob(用户名)="bob-pwd";(密码)

6、kafka整合JAAS:此时的JAAS文件kafka并不认可,所有还需要修改kafka启动服务文件

vim /usr/local/kafka/bin/kafka-server-start.sh 
在文件的顶部追加:export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/jaas/kafka_server_jaas.conf"

7、kafka配置文件:虽然以及进行JAAS定义,但是对于当前的Kafa还未启用SASL认证,所有修改Kafka服务端配置文件。没有配置项添加即可

vim /usr/local/kafka/config/server.properties

port=9095
listeners=SASL_PLAINTEXT://kafka-server:9095
advertised.listeners=SASL_PLAINTEXT://kafka-server:9095
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

在进行启动的时候最好清空这两文件夹

 rm -r /usr/data/zookeeper/*
 rm -r /usr/data/kafka/*
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties

8、程序配置:此时如果要想在程序中访问配置了JAAS认证的kafka服务器,那么最好的做法就是直接进行JAAS安全文件的配置,在磁盘上新建一个文件,保存配置

KafkaClient {  
        org.apache.kafka.common.security.plain.PlainLoginModule required  
        username="bob"  
        password="bob-pwd";  
};  

10、生产者:在程序进行安全访问

static {
        // 表示系统环境属性
        System.setProperty("java.security.auth.login.config","F:/kafka_client_jaas.conf");
    }
props.setProperty(SaslConfigs.SASL_MECHANISM,"PLAIN");
        props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");

11、消费者与生产者配置一样,完成之后就可以进行安全的访问了

spring操作kafka

在项目中引入kafka依赖,要与版本对应

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.4.3.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.13.0</version>
</dependency>

创建kafka-consumer模块

consumer:要进行kafka资源的连接,肯定需要定义一些列的属性文件,定义“resources\kafka\kafka.properties”

# 表示启动SASL认证
kafka.sasl.mechanism=PLAIN
# 表示启动SASL认证
kafka.security.protocol=SASL_PLAINTEXT
# 设置kafka主机端口
kafka.bootstrap.servers=kafka-cluster-a:9095,kafka-cluster-b:9095,kafka-cluster-c:9095
# 设置key的反序列化
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 设置value的反序列化
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 组
kafka.group.id=group-1
# 主题
kafka.topic=rsthe-hello
# 是否自动提交
kafka.auto.commit=true
# 最大的抓取量
kafka.max.poll.records=10

consumer:kafka存在有JAAS安全认证,所以将JAAS文件也要保存在该目录之中“resources\kafka\kafka_client_jaas.conf”

KafkaClient {  
        org.apache.kafka.common.security.plain.PlainLoginModule required  
        username="bob"  
        password="bob-pwd";  
};

consumer:要进行kafka的整合还需要定义相关的Spring配置文件“spring-base.xml”,在这个文件中进行一个配置类的加载

<context:component-scan base-package="cn.rsthe.kafka.consumer"/>

consumer:创建一个Kafka的监听程序类(spring整合特色)

public class KafkaMessageListener implements MessageListener<String, String> {
    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println("【消费端】key = " + record.key() + "、value = " + record.value());
    }
}

consumer:创建kafka配置文件类

package cn.rsthe.kafka.consumer.config;
import cn.rsthe.kafka.consumer.listener.KafkaMessageListener;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class KafkaConsumerConfig {
    @Bean("containerProperties")
    public ContainerProperties getContainerProperties(
            @Value("${kafka.topic}") String topic
    ) {
        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setMessageListener(new KafkaMessageListener());
        return containerProperties;
    }
    @Bean("consumerFactory")
    public DefaultKafkaConsumerFactory getConsumerFactory(
            // 认证处理
            @Value("${kafka.sasl.mechanism}") String saslMechanism,
            // 认证处理
            @Value("${kafka.security.protocol}") String securityProtocol,
            // 连接信息
            @Value("${kafka.bootstrap.servers}") String bootstrapServers,
            // key反序列化
            @Value("${kafka.key.deserializer}") String keyDeserializer,
            // value反序列化
            @Value("${kafka.value.deserializer}") String valueDeserializer,
            // 组
            @Value("${kafka.group.id}") String groupId,
            // 是否自动提交
            @Value("${kafka.auto.commit}") boolean autoCommit,
            // 最大抓取数
            @Value("${kafka.max.poll.records}") int maxPoll
    ) {
        Map<String, Object> map = new HashMap<>();
        map.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
        map.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
        map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
        map.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,autoCommit);
        map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPoll);
        DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory(map);
        return factory;
    }
    @Bean("messageListenerContainer")
    public KafkaMessageListenerContainer getKafkaMessageListenerContainer(
            @Autowired DefaultKafkaConsumerFactory consumerFactory,
            @Autowired ContainerProperties containerProperties
    ){
        KafkaMessageListenerContainer messageListenerContainer = new KafkaMessageListenerContainer(consumerFactory,containerProperties);
        return messageListenerContainer;
    }
    @Bean("jaasResource")
    public Resource getJaasResource(
            @Value("${classpath:kafka/kafka_client_jaas.conf}") Resource jaasPath
    ) throws IOException {
        // 表示系统环境属性
        System.setProperty("java.security.auth.login.config",
                jaasPath.getFile().getPath());
        return jaasPath;
    }
}

定义producer:生产端直接使用consumer复制

producer:创建kafka配置文件类

package cn.rsthe.kafka.producer.config;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.io.Resource;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Configuration
@PropertySource("classpath:kafka/kafka.properties")
public class KafkaProducerConfig {
    @Bean("producerFactory")
    public DefaultKafkaProducerFactory getProducerFactory(
            // 认证处理
            @Value("${kafka.sasl.mechanism}") String saslMechanism,
            // 认证处理
            @Value("${kafka.security.protocol}") String securityProtocol,
            // 连接信息
            @Value("${kafka.bootstrap.servers}") String bootstrapServers,
            // key反序列化
            @Value("${kafka.key.serializer}") String keyeserializer,
            // value反序列化
            @Value("${kafka.value.serializer}") String valueeserializer
    ) {
        Map<String, Object> map = new HashMap<>();
        map.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
        map.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keyeserializer);
        map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueeserializer);
        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(map);
        return factory;
    }
    @Bean("kafkaTemplate")
    public KafkaTemplate getKafkaTemplate(
            @Value("${kafka.topic}") String topic,
            @Autowired DefaultKafkaProducerFactory producerFactory
    ){
        KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory);
        kafkaTemplate.setDefaultTopic(topic);
        return kafkaTemplate;
    }
    @Bean("jaasResource")
    public Resource getJaasResource(
            @Value("${classpath:kafka/kafka_client_jaas.conf}") Resource jaasPath
    ) throws IOException {
        // 表示系统环境属性
        System.setProperty("java.security.auth.login.config",
                jaasPath.getFile().getPath());
        return jaasPath;
    }
}

producer:编写业务层接口和实现子类

接口
package cn.rsthe.kafka.producer.service;
public interface IMessageService {
    public void sendMessage(String value);
}
实现子类
package cn.rsthe.kafka.producer.service.impl;
import cn.rsthe.kafka.producer.service.IMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class MessageServiceImpl implements IMessageService {
    @Autowired
    KafkaTemplate<String,String> kafkaTemplate;
    @Override
    public void sendMessage(String value) {
        this.kafkaTemplate.sendDefault("hello-key",value);
    }
}

编写生产端、消费端测试类

消费端
@ContextConfiguration(locations = {"classpath:spring/spring-base.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class test {
    @Test
    public void tetsConsumer() throws Exception{
        Thread.sleep(Long.MAX_VALUE);
    }
}
生产端
@ContextConfiguration(locations = {"classpath:spring/spring-base.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class test {
    @Autowired
    private IMessageService messageService;
    @Test
    public void testProducer() throws Exception{
        this.messageService.sendMessage("你好kafka");
    }
}

# kafka # 消息中间件 # Streaming