whycxzp
2023-05-11 b0c4d3f59557206ee88ae5af43f4c835e0060149
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#服务端口号
server:
  port: 8777
  serverHeader:
  servlet:
    context-path: /kafka
    session:
      timeout: -1
      cookie:
        name: fg_kafka
  tomcat:
    max-http-form-post-size: 102400000
  message.max.bytes: 156466826
spring:
  kafka:
    # 指定kafka server的地址,集群配多个,中间,逗号隔开
    bootstrap-servers: broker-2:9092
#      broker-1:9092
#      broker-2:9092
#      broker-3:9092
#      - 192.168.10.143:9092
#      - 192.168.10.144:9092
#      - 192.168.10.145:9092
    # kafka生产者配置
    producer:
      # 写入失败时,重试次数。当leader失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
      # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
      retries: 3
      # 每次批量发送消息的数量,produce积累到一定数据,一次发数据量
      # 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。
      # 这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置(16K)
      batch-size: 400000
      # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
      # #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
      buffer-memory: 134217728
      #默认情况下消息是不压缩的,此参数可指定采用何种算法压缩消息,可取值:none,snappy,gzip,lz4。snappy压缩算法由Google研发,
      #这种算法在性能和压缩比取得比较好的平衡;相比之下,gzip消耗更多的CPU资源,但是压缩效果也是最好的。通过使用压缩,我们可以节省网络带宽和Kafka存储成本。
      #如果不开启压缩,可设置为none(默认值),比较大的消息可开启
      compression-type: lz4
      #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
      #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,
      #    无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
      #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后
      #    立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
      #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,
      #    这相当于acks = -1的设置。
      #可以设置的值为:all, -1, 0, 1
      acks: all
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        linger.ms: 20
        # 生产幂等性
        enable.idempotence: true
        # 连接超时时间
        request.timeout.ms: 30000
        max.request.size: 156466826
    consumer:
      group-id: fg #指定消费者组的 group_id
      auto-offset-reset: earliest   #latest 最新的位置 , earliest最早的位置
      auto-commit-interval: 100  #自动提交offset频率 100毫秒
knife:
  enable: true