This article is reprinted from the public account StreamCloudNative. The author is Xue Song, who works at New Continent Software as a senior software engineer.
Edit: Chicken steaks, StreamNative.
About the Apache Pulsar
Apache Pulsar is the top project of Apache Software Foundation. It is the next generation cloud native distributed message flow platform, integrating message, storage and lightweight functional computing. It adopts the architecture design of computing and storage separation, supports multi-tenant, persistent storage, and multi-room cross-region data replication. It has strong consistency, high throughput, low latency, and high scalability.
At present, Apache Pulsar has been adopted by many large Internet and traditional industry companies at home and abroad, cases are distributed in artificial intelligence, finance, telecom operators, live and short video, Internet of Things, retail and e-commerce, online education and other industries, such as Comcast, Yahoo! , Tencent, China Telecom, China Mobile, BIGO, VIPKID, etc.
background
Apache Pulsar, as a cloud native distributed message system, includes Zookeeper, Bookie, broker, functions-worker, proxy and other components, and all components are deployed on multiple hosts in a distributed way. So that each component of the log files are scattered in multiple hosts, when components appear problem, because the log is scattered, want to check for an error message, individual service to each service to troubleshoot, more troublesome, our approach is usually directly to grep log files, such as the awk command can obtain desired information. However, as the volume of applications and services increases, so do the supported nodes, so the traditional method exposed many problems, such as: low efficiency, too much log archiving, text search is too slow, how to multi-dimensional query and so on. Therefore, we hope that by aggregating and monitoring the logs, we can quickly find the error information of each Pulsar service and quickly troubleshoot, making the operation and maintenance more purposeful, targeted and direct.
In order to solve the problem of log retrieval, our team considered using a centralized log collection system to collect, manage and access logs on all Pulsar nodes in a unified manner.
A complete centralized log system must contain the following main features:
- Collect – Can collect log data from multiple sources;
- Transfer – Can stably transfer log data to the central system;
- Storage – How to store log data;
- Analysis – can support UI analysis;
- Warning – Provides error reporting and monitoring mechanisms.
ELK provides a complete set of solutions, and all of them are open source software, which cooperate with each other and perfectly connect with each other, and meet the application of many occasions efficiently. It is a mainstream log system at present. Our company has a self-developed big data management platform, through which we deploy and manage ELK, and have used ELK to provide support services for multiple business systems in the production system. ELK stands for three open source software: Elasticsearch, Logstash, and Kibana are all open-source software. The latest version has been renamed Elastic Stack and added Beats, including FileBeat, a lightweight log collection and processing tool. Filebeat consumes less resources. It is suitable for collecting logs from various servers and transferring them to Logstash.
As you can see in the figure above, there are two problems if Pulsar uses this log collection mode:
- Hosts with Pulsar service deployed must have a Filebeat service deployed.
- The Pulsar service logs must be dropped to the disk as a file, occupying IO of the host disk.
Log4j2 supports sending logs to Kafka by default. Kafka uses Log4j2Appender. You can configure the Log4j2 configuration file to send logs generated by Log4j2 to Kafka in real time.
As shown below:
Implementation process
Take Pulsar 2.6.2 as an example to introduce the detailed implementation process of Apache Pulsar based on Log4j2+Kafka+ELK to achieve fast log retrieval solution.
First, preparation
The first thing you need to determine is what fields are used to retrieve logs in Kibana, which can be aggregated and queried in multiple dimensions. Then, Elasticsearch splits words and creates indexes based on the retrieved fields.
As shown in the figure above, we set up 8 retrieval fields for Pulsar logs, namely: cluster name, host name, host IP, component name, log content, system time, log level, and cluster instance.
Ii. Implementation process
Note: In order to keep the structure of Pulsar’s native configuration files and scripts intact, we implemented this solution by adding new configuration files and scripts.
1. Add a configuration file
Add the following two configuration files to the {PULSAR_HOME}/conf directory:
1) Logenv.sh This file is used to configure the JVM options required for Pulsar component startup into the Pulsar service Java process, as shown in the following example:
KAFKA_CLUSTER = 192.168.0.1:9092192168 0.2:9092192168:0.2 9092 PULSAR_CLUSTER = PULSAR_CLUSTER PULSAR_TOPIC = PULSAR_TOPIC HOST_IP = 192.168.0.1 PULSAR_MODULE_INSTANCE_ID = 1Copy the code
The meanings of the above fields are:
- KAFKA_CLUSTER: Kafka broker list address;
- PULSAR_CLUSTER: cluster name of Pulsar;
- PULSAR_TOPIC: the Topic in Kafka used to access the Pulsar service log;
- HOST_IP: IP address of the Pulsar host.
- PULSAR_MODULE_INSTANCE_ID: indicates the instance ID of the Pulsar service. Multiple Pulsar clusters may be deployed on a host.
2) log4j2 – kafka. Yaml
Yaml is copied from log4j2.yaml. Add the following changes to log4j2.yaml: (Note: In the following figure, log4j2.yaml is on the left and log4j2-kafka.yaml is on the right.)
- Add Kafka cluster Broker list and define log4j2 to Kafka message log format. Elasticsearch uses Spaces to split the eight search fields in Kafka.
• Add kafka Appenders;
• add Failover;
• Modify Root and Logger of Loggers to asynchronous mode;
• The log4j2-kafka.yaml configuration file is complete as follows:
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
Configuration:
status: INFO
monitorInterval: 30
name: pulsar
packages: io.prometheus.client.log4j2
Properties:
Property:
- name: "pulsar.log.dir"
value: "logs"
- name: "pulsar.log.file"
value: "pulsar.log"
- name: "pulsar.log.appender"
value: "RoutingAppender"
- name: "pulsar.log.root.level"
value: "info"
- name: "pulsar.log.level"
value: "info"
- name: "pulsar.routing.appender.default"
value: "Console"
- name: "kafkaBrokers"
value: "${sys:kafka.cluster}"
- name: "pattern"
value: "${sys:pulsar.cluster} ${sys:pulsar.hostname} ${sys:pulsar.hostip} ${sys:pulsar.module.type} ${sys:pulsar.module.instanceid} %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%c{10}] %level , %msg%n"
# Example: logger-filter script
Scripts:
ScriptFile:
name: filter.js
language: JavaScript
path: ./conf/log4j2-scripts/filter.js
charset: UTF-8
Appenders:
#Kafka
Kafka:
name: "pulsar_kafka"
topic: "${sys:pulsar.topic}"
ignoreExceptions: "false"
PatternLayout:
pattern: "${pattern}"
Property:
- name: "bootstrap.servers"
value: "${kafkaBrokers}"
- name: "max.block.ms"
value: "2000"
# Console
Console:
name: Console
target: SYSTEM_OUT
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Failover:
name: "Failover"
primary: "pulsar_kafka"
retryIntervalSeconds: "600"
Failovers:
AppenderRef:
ref: "RollingFile"
# Rolling file appender configuration
RollingFile:
name: RollingFile
fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}"
filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
immediateFlush: false
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: 1 GB
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.log.file}*log.gz"
IfLastModified:
age: 30d
Prometheus:
name: Prometheus
# Routing
Routing:
name: RoutingAppender
Routes:
pattern: "$${ctx:function}"
Route:
-
Routing:
name: InstanceRoutingAppender
Routes:
pattern: "$${ctx:instance}"
Route:
-
RollingFile:
name: "Rolling-${ctx:function}"
fileName : "${sys:pulsar.log.dir}/functions/${ctx:function}/${ctx:functionname}-${ctx:instance}.log"
filePattern : "${sys:pulsar.log.dir}/functions/${sys:pulsar.log.file}-${ctx:instance}-%d{MM-dd-yyyy}-%i.log.gz"
PatternLayout:
Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance: %X{instance}] %logger{1} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: "20MB"
# Trigger every day at midnight that also scan
# roll-over strategy that deletes older file
CronTriggeringPolicy:
schedule: "0 0 0 * * ?"
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.log.file}*log.gz"
IfLastModified:
age: 30d
- ref: "${sys:pulsar.routing.appender.default}"
key: "${ctx:function}"
- ref: "${sys:pulsar.routing.appender.default}"
key: "${ctx:function}"
Loggers:
# Default root logger configuration
AsyncRoot:
level: "${sys:pulsar.log.root.level}"
additivity: true
AppenderRef:
- ref: "Failover"
level: "${sys:pulsar.log.level}"
- ref: Prometheus
level: info
AsyncLogger:
- name: org.apache.bookkeeper.bookie.BookieShell
level: info
additivity: false
AppenderRef:
- ref: Console
- name: verbose
level: info
additivity: false
AppenderRef:
- ref: Console
# Logger to inject filter script
# - name: org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl
# level: debug
# additivity: false
# AppenderRef:
# ref: "${sys:pulsar.log.appender}"
# ScriptFilter:
# onMatch: ACCEPT
# onMisMatch: DENY
# ScriptRef:
# ref: filter.js
Copy the code
Matters needing attention:
- Log access must be asynchronous and must not affect service performance.
- When a system with high response requirements is connected to a third-party system, it must rely on decoupling. In this case, the Failover Appender decouple the dependence on Kafka. When Kafka crashes, the log triggers a Failover and writes data to the local server.
- The default value of log4j2 Failover appender retryIntervalSeconds is 1 minute, which is changed by exceptions. Therefore, you can increase the interval, for example, 10 minutes.
- Kafka appender ignoreExceptions must be set to false otherwise Failover cannot be triggered.
- When KafkaClient is down, an attempt to write to Kafka will take 1 minute to return Exception, after which Failover will be triggered. When the number of requests is large, the log4j2 queue will fill up quickly, and then log writing will block, seriously affecting the response of the main service. So make it short enough, make it long enough.
2. Add the script file
Add the following two scripts to {PULSAR_HOME}/bin: 1) pulsar-kafka This script file is copied from the pulsar script file. Add the following modifications to the pulsar script file: The figure below shows Pulsar on the left and Pulsar-kafka on the right.
• specify log4j2 – kafka. Yaml;
• Add content to read logenv.sh;
• Added OPTS option to pass JVM option to Java process when pulsar component is started from pulsar-kafka and pulsar-daemon-kafka scripts;
• The complete contents of the pulsar-kafka script file are as follows:
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
BINDIR=$(dirname "$0")
export PULSAR_HOME=`cd -P $BINDIR/..;pwd`
DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf
DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf
DEFAULT_ZK_CONF=$PULSAR_HOME/conf/zookeeper.conf
DEFAULT_CONFIGURATION_STORE_CONF=$PULSAR_HOME/conf/global_zookeeper.conf
DEFAULT_DISCOVERY_CONF=$PULSAR_HOME/conf/discovery.conf
DEFAULT_PROXY_CONF=$PULSAR_HOME/conf/proxy.conf
DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf
DEFAULT_WEBSOCKET_CONF=$PULSAR_HOME/conf/websocket.conf
DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2-kafka.yaml
DEFAULT_PULSAR_PRESTO_CONF=${PULSAR_HOME}/conf/presto
# functions related variables
FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
DEFAULT_WORKER_CONF=$PULSAR_HOME/conf/functions_worker.yml
DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR=$PULSAR_HOME/instances/deps
FUNCTIONS_EXTRA_DEPS_DIR=${PULSAR_FUNCTIONS_EXTRA_DEPS_DIR:-"${DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR}"}
SQL_HOME=$PULSAR_HOME/pulsar-sql
PRESTO_HOME=${PULSAR_HOME}/lib/presto
# Check bookkeeper env and load bkenv.sh
if [ -f "$PULSAR_HOME/conf/bkenv.sh" ]
then
. "$PULSAR_HOME/conf/bkenv.sh"
fi
# Check pulsar env and load pulser_env.sh
if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]
then
. "$PULSAR_HOME/conf/pulsar_env.sh"
fi
if [ -f "$PULSAR_HOME/conf/logenv.sh" ]
then
. "$PULSAR_HOME/conf/logenv.sh"
fi
# Check for the java to use
if [[ -z $JAVA_HOME ]]; then
JAVA=$(which java)
if [ $? != 0 ]; then
echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2
exit 1
fi
else
JAVA=$JAVA_HOME/bin/java
fi
# exclude tests jar
RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [ $? == 0 ]; then
PULSAR_JAR=$RELEASE_JAR
fi
# exclude tests jar
BUILT_JAR=`ls $PULSAR_HOME/pulsar-broker/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
echo "\nCouldn't find pulsar jar.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
elif [ -e "$BUILT_JAR" ]; then
PULSAR_JAR=$BUILT_JAR
fi
#
# find the instance locations for pulsar-functions
#
# find the java instance location
if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
# didn't find a released jar, then search the built jar
BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar"
if [ -z "${BUILT_JAVA_INSTANCE_JAR}" ]; then
echo "\nCouldn't find pulsar-functions java instance jar.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
fi
JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
fi
# find the python instance location
if [ ! -f "${PY_INSTANCE_FILE}" ]; then
# didn't find a released python instance, then search the built python instance
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then
echo "\nCouldn't find pulsar-functions python instance.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
fi
PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
fi
# find pulsar sql presto distribution location
check_presto_libraries() {
if [ ! -d "${PRESTO_HOME}" ]; then
BUILT_PRESTO_HOME="${SQL_HOME}/presto-distribution/target/pulsar-presto-distribution"
if [ ! -d "${BUILT_PRESTO_HOME}" ]; then
echo "\nCouldn't find presto distribution.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
fi
PRESTO_HOME=${BUILT_PRESTO_HOME}
fi
}
pulsar_help() {
cat <<EOF
Usage: pulsar <command>
where command is one of:
broker Run a broker server
bookie Run a bookie server
zookeeper Run a zookeeper server
configuration-store Run a configuration-store server
discovery Run a discovery server
proxy Run a pulsar proxy
websocket Run a web socket proxy server
functions-worker Run a functions worker server
sql-worker Run a sql worker server
sql Run sql CLI
standalone Run a broker server with local bookies and local zookeeper
initialize-cluster-metadata One-time metadata initialization
delete-cluster-metadata Delete a cluster's metadata
initialize-transaction-coordinator-metadata One-time transaction coordinator metadata initialization
initialize-namespace namespace initialization
compact-topic Run compaction against a topic
zookeeper-shell Open a ZK shell client
broker-tool CLI to operate a specific broker
tokens Utility to create authentication tokens
help This help message
or command is the full name of a class with a defined main() method.
Environment variables:
PULSAR_LOG_CONF Log4j configuration file (default $DEFAULT_LOG_CONF)
PULSAR_BROKER_CONF Configuration file for broker (default: $DEFAULT_BROKER_CONF)
PULSAR_BOOKKEEPER_CONF Configuration file for bookie (default: $DEFAULT_BOOKKEEPER_CONF)
PULSAR_ZK_CONF Configuration file for zookeeper (default: $DEFAULT_ZK_CONF)
PULSAR_CONFIGURATION_STORE_CONF Configuration file for global configuration store (default: $DEFAULT_CONFIGURATION_STORE_CONF)
PULSAR_DISCOVERY_CONF Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF)
PULSAR_WEBSOCKET_CONF Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF)
PULSAR_PROXY_CONF Configuration file for Pulsar proxy (default: $DEFAULT_PROXY_CONF)
PULSAR_WORKER_CONF Configuration file for functions worker (default: $DEFAULT_WORKER_CONF)
PULSAR_STANDALONE_CONF Configuration file for standalone (default: $DEFAULT_STANDALONE_CONF)
PULSAR_PRESTO_CONF Configuration directory for Pulsar Presto (default: $DEFAULT_PULSAR_PRESTO_CONF)
PULSAR_EXTRA_OPTS Extra options to be passed to the jvm
PULSAR_EXTRA_CLASSPATH Add extra paths to the pulsar classpath
PULSAR_PID_DIR Folder where the pulsar server PID file should be stored
PULSAR_STOP_TIMEOUT Wait time before forcefully kill the pulsar server instance, if the stop is not successful
These variable can also be set in conf/pulsar_env.sh
EOF
}
add_maven_deps_to_classpath() {
MVN="mvn"
if [ "$MAVEN_HOME" != "" ]; then
MVN=${MAVEN_HOME}/bin/mvn
fi
# Need to generate classpath from maven pom. This is costly so generate it
# and cache it. Save the file into our target dir so a mvn clean will get
# clean it up and force us create a new one.
f="${PULSAR_HOME}/distribution/server/target/classpath.txt"
if [ ! -f "${f}" ]
then
${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
fi
PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
}
if [ -d "$PULSAR_HOME/lib" ]; then
PULSAR_CLASSPATH=$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*
ASPECTJ_AGENT_PATH=`ls -1 $PULSAR_HOME/lib/org.aspectj-aspectjweaver-*.jar`
else
add_maven_deps_to_classpath
ASPECTJ_VERSION=`grep '<aspectj.version>' $PULSAR_HOME/pom.xml | awk -F'>' '{print $2}' | awk -F'<' '{print $1}'`
ASPECTJ_AGENT_PATH="$HOME/.m2/repository/org/aspectj/aspectjweaver/$ASPECTJ_VERSION/aspectjweaver-$ASPECTJ_VERSION.jar"
fi
ASPECTJ_AGENT="-javaagent:$ASPECTJ_AGENT_PATH"
# if no args specified, show usage
if [ $# = 0 ]; then
pulsar_help;
exit 1;
fi
# get arguments
COMMAND=$1
shift
if [ -z "$PULSAR_WORKER_CONF" ]; then
PULSAR_WORKER_CONF=$DEFAULT_WORKER_CONF
fi
if [ -z "$PULSAR_BROKER_CONF" ]; then
PULSAR_BROKER_CONF=$DEFAULT_BROKER_CONF
fi
if [ -z "$PULSAR_BOOKKEEPER_CONF" ]; then
PULSAR_BOOKKEEPER_CONF=$DEFAULT_BOOKKEEPER_CONF
fi
if [ -z "$PULSAR_ZK_CONF" ]; then
PULSAR_ZK_CONF=$DEFAULT_ZK_CONF
fi
if [ -z "$PULSAR_GLOBAL_ZK_CONF" ]; then
PULSAR_GLOBAL_ZK_CONF=$DEFAULT_GLOBAL_ZK_CONF
fi
if [ -z "$PULSAR_CONFIGURATION_STORE_CONF" ]; then
PULSAR_CONFIGURATION_STORE_CONF=$DEFAULT_CONFIGURATION_STORE_CONF
fi
if [ -z "$PULSAR_DISCOVERY_CONF" ]; then
PULSAR_DISCOVERY_CONF=$DEFAULT_DISCOVERY_CONF
fi
if [ -z "$PULSAR_PROXY_CONF" ]; then
PULSAR_PROXY_CONF=$DEFAULT_PROXY_CONF
fi
if [ -z "$PULSAR_WEBSOCKET_CONF" ]; then
PULSAR_WEBSOCKET_CONF=$DEFAULT_WEBSOCKET_CONF
fi
if [ -z "$PULSAR_STANDALONE_CONF" ]; then
PULSAR_STANDALONE_CONF=$DEFAULT_STANDALONE_CONF
fi
if [ -z "$PULSAR_LOG_CONF" ]; then
PULSAR_LOG_CONF=$DEFAULT_LOG_CONF
fi
if [ -z "$PULSAR_PRESTO_CONF" ]; then
PULSAR_PRESTO_CONF=$DEFAULT_PULSAR_PRESTO_CONF
fi
PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH"
PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH"
OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`"
# Ensure we can read bigger content from ZK. (It might be
# rarely needed when trying to list many z-nodes under a
# directory)
OPTS="$OPTS -Djute.maxbuffer=10485760 -Djava.net.preferIPv4Stack=true"
OPTS="-cp $PULSAR_CLASSPATH $OPTS"
OPTS="$OPTS $PULSAR_EXTRA_OPTS $PULSAR_MEM $PULSAR_GC"
# log directory & file
PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
PULSAR_LOG_ROOT_LEVEL=${PULSAR_LOG_ROOT_LEVEL:-"info"}
PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}
#Configure log configuration system properties
OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"
# Functions related logging
OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
# instance
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR}"
OPTS="$OPTS -Dpulsar.functions.instance.classpath=${PULSAR_CLASSPATH}"
OPTS="$OPTS -Dpulsar.module.instanceid=${PULSAR_MODULE_INSTANCE_ID} -Dpulsar.module.type=$COMMAND -Dkafka.cluster=${KAFKA_CLUSTER} -Dpulsar.hostname=${HOSTNAME} -Dpulsar.hostip=${HOST_IP} -Dpulsar.cluster=${PULSAR_CLUSTER} -Dpulsar.topic=${PULSAR_TOPIC}"
ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=* -Dzookeeper.snapshot.trust.empty=true"
#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"
if [ $COMMAND == "broker" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-broker.log"}
exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarBrokerStarter --broker-conf $PULSAR_BROKER_CONF $@
elif [ $COMMAND == "bookie" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"bookkeeper.log"}
# Pass BOOKIE_EXTRA_OPTS option defined in pulsar_env.sh
OPTS="$OPTS $BOOKIE_EXTRA_OPTS"
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.bookkeeper.proto.BookieServer --conf $PULSAR_BOOKKEEPER_CONF $@
elif [ $COMMAND == "zookeeper" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"zookeeper.log"}
exec $JAVA ${ZK_OPTS} $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ZooKeeperStarter $PULSAR_ZK_CONF $@
elif [ $COMMAND == "global-zookeeper" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"}
# Allow global ZK to turn into read-only mode when it cannot reach the quorum
OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_GLOBAL_ZK_CONF $@
elif [ $COMMAND == "configuration-store" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"configuration-store.log"}
# Allow global ZK to turn into read-only mode when it cannot reach the quorum
OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_CONFIGURATION_STORE_CONF $@
elif [ $COMMAND == "discovery" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"discovery.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter $PULSAR_DISCOVERY_CONF $@
elif [ $COMMAND == "proxy" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-proxy.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.proxy.server.ProxyServiceStarter --config $PULSAR_PROXY_CONF $@
elif [ $COMMAND == "websocket" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-websocket.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.websocket.service.WebSocketServiceStarter $PULSAR_WEBSOCKET_CONF $@
elif [ $COMMAND == "functions-worker" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-functions-worker.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.functions.worker.FunctionWorkerStarter -c $PULSAR_WORKER_CONF $@
elif [ $COMMAND == "standalone" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-standalone.log"}
exec $JAVA $OPTS $ASPECTJ_AGENT ${ZK_OPTS} -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
elif [ $COMMAND == "initialize-cluster-metadata" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@
elif [ $COMMAND == "delete-cluster-metadata" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataTeardown $@
elif [ $COMMAND == "initialize-transaction-coordinator-metadata" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup $@
elif [ $COMMAND == "initialize-namespace" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarInitialNamespaceSetup $@
elif [ $COMMAND == "zookeeper-shell" ]; then
exec $JAVA $OPTS org.apache.zookeeper.ZooKeeperMain $@
elif [ $COMMAND == "broker-tool" ]; then
exec $JAVA $OPTS org.apache.pulsar.broker.tools.BrokerTool $@
elif [ $COMMAND == "compact-topic" ]; then
exec $JAVA $OPTS org.apache.pulsar.compaction.CompactorTool --broker-conf $PULSAR_BROKER_CONF $@
elif [ $COMMAND == "sql" ]; then
check_presto_libraries
exec $JAVA -cp "${PRESTO_HOME}/lib/*" io.prestosql.cli.Presto --server localhost:8081 "${@}"
elif [ $COMMAND == "sql-worker" ]; then
check_presto_libraries
exec ${PRESTO_HOME}/bin/launcher --etc-dir ${PULSAR_PRESTO_CONF} "${@}"
elif [ $COMMAND == "tokens" ]; then
exec $JAVA $OPTS org.apache.pulsar.utils.auth.tokens.TokensCliUtils $@
elif [ $COMMAND == "help" -o $COMMAND == "--help" -o $COMMAND == "-h" ]; then
pulsar_help;
else
echo ""
echo "-- Invalid command '$COMMAND' -- Use '$0 help' to get a list of valid commands"
echo ""
exit 1
fi
Copy the code
2) the pulsar – daemon – kafka
This script file is copied from the pulsar-daemon script file. Add the following changes to the pulsar-daemon script file: (Note: in the following figure, pulsar-daemon is on the left and pulsar-daemon-kafka is on the right)
• Add content to read logenv.sh;
• Read pulsar-kafka;
• The complete contents of the pulsar-daemon-kafka script are as follows:
#! /usr/bin/env bash # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under The Apache License, Version 2.0 (The # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # usage() { cat <<EOF Usage: pulsar-daemon (start|stop) <command> <args... > where command is one of: broker Run a broker server bookie Run a bookie server zookeeper Run a zookeeper server configuration-store Run a configuration-store server discovery Run a discovery server websocket Run a websocket proxy server functions-worker Run a functions worker server standalone Run a standalone Pulsar service proxy Run a Proxy Pulsar service where argument is one of: -force (accepted only with stop command): Decides whether to stop the server forcefully if not stopped by normal shutdown EOF } BINDIR=$(dirname "$0") PULSAR_HOME=$(cd -P $BINDIR/.. ; pwd) # Check bookkeeper env and load bkenv.sh if [ -f "$PULSAR_HOME/conf/bkenv.sh" ] then . "$PULSAR_HOME/conf/bkenv.sh" fi if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ] then . "$PULSAR_HOME/conf/pulsar_env.sh" fi if [ -f "$PULSAR_HOME/conf/logenv.sh" ] then . "$PULSAR_HOME/conf/logenv.sh" fi PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RollingFile"} PULSAR_STOP_TIMEOUT=${PULSAR_STOP_TIMEOUT:-30} PULSAR_PID_DIR=${PULSAR_PID_DIR:-$PULSAR_HOME/bin} if [ $# = 0 ]; then usage exit 1 elif [ $# = 1 ]; then if [ $1 == "--help" -o $1 == "-h" ]; then usage exit 1 else echo "Error: no enough arguments provided." usage exit 1 fi fi startStop=$1 shift command=$1 shift case $command in (broker) echo "doing $startStop $command ..." ;; (bookie) echo "doing $startStop $command ..." ;; (zookeeper) echo "doing $startStop $command ..." ;; (global-zookeeper) echo "doing $startStop $command ..." ;; (configuration-store) echo "doing $startStop $command ..." ;; (discovery) echo "doing $startStop $command ..." ;; (websocket) echo "doing $startStop $command ..." ;; (functions-worker) echo "doing $startStop $command ..." ;; (standalone) echo "doing $startStop $command ..." ;; (proxy) echo "doing $startStop $command ..." ;; (*) echo "Error: unknown service name $command" usage exit 1 ;; esac export PULSAR_LOG_DIR=$PULSAR_LOG_DIR export PULSAR_LOG_APPENDER=$PULSAR_LOG_APPENDER export PULSAR_LOG_FILE=pulsar-$command-$HOSTNAME.log pid=$PULSAR_PID_DIR/pulsar-$command.pid out=$PULSAR_LOG_DIR/pulsar-$command-$HOSTNAME.out logfile=$PULSAR_LOG_DIR/$PULSAR_LOG_FILE rotate_out_log () { log=$1; num=5; if [ -n "$2" ]; then num=$2 fi if [ -f "$log" ]; then # rotate logs while [ $num -gt 1 ]; do prev=`expr $num - 1` [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num" num=$prev done mv "$log" "$log.$num"; fi } mkdir -p "$PULSAR_LOG_DIR" case $startStop in (start) if [ -f $pid ]; then if kill -0 `cat $pid` > /dev/null 2>&1; then echo $command running as process `cat $pid`. Stop it first. exit 1 fi fi rotate_out_log $out echo starting $command, logging to $logfile echo Note: Set immediateFlush to true in conf/log4j2-kafka.yaml will guarantee the logging event is flushing to disk immediately. The default behavior is switched off due to performance considerations. pulsar=$PULSAR_HOME/bin/pulsar-kafka nohup $pulsar $command "$@" > "$out" 2>&1 < /dev/null & echo $! > $pid sleep 1; head $out sleep 2; if ! ps -p $! > /dev/null ; then exit 1 fi ;; (stop) if [ -f $pid ]; then TARGET_PID=$(cat $pid) if kill -0 $TARGET_PID > /dev/null 2>&1; then echo "stopping $command" kill $TARGET_PID count=0 location=$PULSAR_LOG_DIR while ps -p $TARGET_PID > /dev/null; do echo "Shutdown is in progress... Please wait..." sleep 1 count=`expr $count + 1` if [ "$count" = "$PULSAR_STOP_TIMEOUT" ]; then break fi done if [ "$count" != "$PULSAR_STOP_TIMEOUT" ]; then echo "Shutdown completed." fi if kill -0 $TARGET_PID > /dev/null 2>&1; then fileName=$location/$command.out $JAVA_HOME/bin/jstack $TARGET_PID > $fileName echo "Thread dumps are taken for analysis at $fileName" if [ "$1" == "-force" ] then echo "forcefully stopping $command" kill -9 $TARGET_PID >/dev/null 2>&1 echo Successfully stopped the process else echo "WARNNING : $command is not stopped completely." exit 1 fi fi else echo "no $command to stop" fi rm $pid else echo no "$command to stop" fi ;; (*) usage exit 1 ;; esacCopy the code
Add jars that Kafka Producer depends on
Add the following three JARS to the {PULSAR_HOME}/lib directory on all nodes of the pulsar cluster:
Connect - API - 2.0.1. Jar disruptor - 3.4.2. Jar kafka - clients - 2.0.1. JarCopy the code
4. Start Pulsar service
- To ensure that the Pulsar service logs can be written to Kafka correctly, the bin/pulsar-kafka command is used to start the Pulsar service. If there is no exception, the bin/pulsar-daemon-kafka command is used to start the Pulsar service.
- For example, to start the broker, execute the following command:
bin/pulsar-daemon-kafka start broker
Copy the code
- Run the ps command to view the broker process as follows:
The sys tag in log4j2-kafka.yaml can instantiate a Kafka Producer using these properties. The broker process logs are sent to the Kafka Broker via Kafka Producer.
5. Test whether Pulsar logs are successfully written to Kafka Broker
Start a Kafka Consumer, subscribe to the Topic log4j2 sends the message, and read the following message content, separated by Spaces:
Broker 1 2020-12-26 17:40:14.363 [promethes-stats-44-1] [org. Eclipse jetty. Server. RequestLog] INFO - 192.168.0.1 - [26 / Dec / 2020:17:40:14 + 0800] "the GET/metrics/HTTP / 1.1" 200 23445 "http://192.168.0.1:8080/metrics" "Prometheus / 2.22.1" 4Copy the code
6. Log retrieval
Open the Kibana page AND search according to the keyword fields: cluster:”pulsar-cluster” AND hostname:”XXX” AND module:”broker” AND level:”INFO”
In the figure above, you can see the log retrieval results for a period of time, and you can add Available fields to the retrieval results as needed. In this way, developers or operations personnel can quickly and effectively analyze the causes of Pulsar service anomalies from multiple dimensions through Kibana. At this point, Apache Pulsar is based on Log4j2+Kafka+ELK to achieve fast log retrieval of a complete set of solutions.
conclusion
At present, the distributed, is a more popular technology direction of service, in a production system, with the continuous development of business, the rapid development of applications and services dimension, from monomer/vertical architecture to distributed/micro service architecture is a natural choice, it is mainly manifested in reducing complexity, fault tolerance, independent deployment, horizontal scaling, etc. But it also faces new challenges, such as the efficiency of troubleshooting and the convenience of operation and maintenance monitoring. This article takes Apache Pulsar as an example to share how the Java process uses Log4j2+Kafka+ELK to realize the fast retrieval of distributed and microservized logs and achieve the effect of service governance.
reading
Focus on StreamCloudNative and discuss the development trend of technologies in various fields with the author 👇
- Collect logs to Pulsar using Elastic Beats
- How to use Apache Flume to send log data to Apache Pulsar
- KoP officially open source: native Kafka protocol supported on Apache Pulsar
Welcome to contribute
Did you get any inspiration from this article?
Do you have any unique experiences to share and grow with your community?
The Apache Pulsar community welcomes contributions. Apache Pulsar and StreamNative hope to provide a platform for people to share their Pulsar experience and knowledge, and help more people in the community learn more about Pulsar. Scan code to add Bot friends can contact contribute 👇
Click the link to read the original article!