Kafka集群安装

作者: 云计算机网 分类: 云计算知识 发布时间: 2019-03-17 15:44
安装Kafka集群

假设我们有集群中,需要配置4个broker,形成下面图表的Kafka集群。



2.1 配置文件

配置所有的Kafka的Producer文件,其中,brokerpid的值是独一无二的数字值。几个核心属性如下:

<span style="font-size:18px;"># The id of the broker.This must be set to a unique integer for each broker.broker.id=11# The port the socketserver listens onport=9092# Hostname the broker willbind to. If not set, the server will bind to all interfaceshost.name=hadoop-master#zookeeper 集群 zookeeper.connect=machine-1:2222,machine-2:2222,machine-0:2222</span>


由于Kafka依赖于Zookeeper集群,所以,必须先启动Zookeeper集群。这里不作具体介绍。

2.2 broker属性配置

在机器machine-0和Hadoop-master,配置好对应broker配置文件,两个机器上的配置属性相同。

Hadoop-master 上,Kafka的server.properties配置:

<span style="font-size:18px;"># The id of the broker.This must be set to a unique integer for each broker.broker.id=11 #############################Socket Server Settings ############################# # The port the socketserver listens onport=9092 # Hostname the broker willbind to. If not set, the server will bind to all interfaceshost.name=hadoop-master # Hostname the broker willadvertise to producers and consumers. If not set, it uses the# value for"host.name" if configured. Otherwise, it will use the value returned from#java.net.InetAddress.getCanonicalHostName().#advertised.host.name=<hostnameroutable by clients> # The port to publish toZooKeeper for clients to use. If this is not set,# it will publish the sameport that the broker binds to.#advertised.port=<portaccessible by clients> # The number of threadshandling network requestsnum.network.threads=3 # The number of threadsdoing disk I/Onum.io.threads=8 # The send buffer(SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=1048576 # The receive buffer(SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=1048576 # The maximum size of arequest that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600  #############################Log Basics ############################# # A comma seperated list ofdirectories under which to store log fileslog.dirs=/opt/kafka/logs # The default number of logpartitions per topic. More partitions allow greater# parallelism forconsumption, but this will also result in more files across# the brokers.num.partitions=3 #############################Log Flush Policy ############################# # Messages are immediatelywritten to the filesystem but by default we only fsync() to sync# the OS cache lazily. Thefollowing configurations control the flush of data to disk.# There are a few importanttrade-offs here:#    1. Durability: Unflushed data may be lostif you are not using replication.#    2. Latency: Very large flush intervals maylead to latency spikes when the flush does occur as there will be a lot of datato flush.#    3. Throughput: The flush is generally themost expensive operation, and a small flush interval may lead to exceessiveseeks.# The settings below allowone to configure the flush policy to flush data after a period of time or# every N messages (orboth). This can be done globally and overridden on a per-topic basis. # The number of messages toaccept before forcing a flush of data to disk#log.flush.interval.messages=10000 # The maximum amount oftime a message can sit in a log before we force a flush#log.flush.interval.ms=1000 #############################Log Retention Policy ############################# # The followingconfigurations control the disposal of log segments. The policy can# be set to delete segmentsafter a period of time, or after a given size has accumulated.# A segment will be deletedwhenever *either* of these criteria are met. Deletion always happens# from the end of the log. # The minimum age of a logfile to be eligible for deletionlog.retention.hours=168 # A size-based retentionpolicy for logs. Segments are pruned from the log as long as the remaining# segments don't drop belowlog.retention.bytes.#log.retention.bytes=1073741824 # The maximum size of a logsegment file. When this size is reached a new log segment will be created.log.segment.bytes=536870912 # The interval at which logsegments are checked to see if they can be deleted according# to the retention policieslog.retention.check.interval.ms=60000 # By default the logcleaner is disabled and the log retention policy will default to just deletesegments after their retention expires.# Iflog.cleaner.enable=true is set the cleaner will be enabled and individual logscan then be marked for log compaction.log.cleaner.enable=false #############################Zookeeper ############################# # Zookeeper connectionstring (see zookeeper docs for details).# This is a comma separatedhost:port pairs, each corresponding to a zk# server. e.g."127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".# You can also append anoptional chroot string to the urls to specify the# root directory for allkafka znodes.zookeeper.connect=machine-1:2222,machine-2:2222,machine-0:2222#server.1=machine-0:2888:3888#server.2=machine-1:2888:3888#server.3=machine-2:2888:3888 # Timeout in ms forconnecting to zookeeperzookeeper.connection.timeout.ms=1000000 </span> 

网站内容禁止违规转载,转载授权联系中国云计算网