Abstract:

Ali Cloud log service is a one-stop service for real-time data. Users only need to focus on analysis, and data collection, docking of various storage computing, data index, query and other trivial work can be completed by log service.

The most basic function of log service is LogHub, which supports real-time data collection and consumption. The real-time consumption family includes Spark Streaming, Storm and StreamCompute (Blink), and now Flink is added.

Flink Connector

Flink Log Connector is a tool for connecting Flink provided by Ali Cloud log service. It consists of two parts, Consumer and Producer.

The consumer is used to read data from the logging service, supports exactly once semantics, and supports SHard load balancing. The producer is used to write data to the logging service. When using connector, you need to add maven dependencies to your project:

< the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - streaming - java_2. 11 < / artifactId > < version > 1.3.2 < / version > < / dependency > < the dependency > < groupId > com. The aliyun. Openservices < / groupId > < artifactId > flink - log - connector < / artifactId > < version > 0.1.3 < / version > < / dependency > < the dependency > < the groupId > com. Google. Protobuf < / groupId > < artifactId > protobuf - Java < / artifactId > < version > 2.5.0 < / version > < / dependency > < the dependency > < groupId > com. Aliyun. Openservices < / groupId > < artifactId > aliyun - log < / artifactId > < version > 0.6.10 < / version > </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>log- loghub - producer < / artifactId > < version > 0.1.8 < / version > < / dependency >Copy the code

Code: making

usage

  1. Create Logstore correctly by referring to the log service documentation.
  2. If you use a subaccount, ensure that the LogStore RAM policy is correctly configured. See Granting RAM SubUsers Access to Logging Service Resources.

1. Log Consumer

In Connector, the FlinkLogConsumer class provides the ability to subscribe to a LogStore in the logging service, implementing the exactly Once semantics. When using this class, the user does not need to care about changes in the number of shards in the LogStore.

Each flink subtask is responsible for consuming part of the Shard in LogStore. If the shard in LogStore is split or merged, the shard consumed by the subtask will also change.

1.1 Setting Startup Parameters

Properties configProps = new Properties(); // Set the domain name to access the logging service configProps. Put (ConfigConstants.LOG_ENDPOINT,"cn-hangzhou.log.aliyuncs.com"); // Set access to ak configProps. Put (ConfigConstants.LOG_ACCESSSKEYID,""); ConfigProps. Put (ConfigConstants LOG_ACCESSKEY,""); // Set project configProps. Put (ConfigConstants.LOG_PROJECT,"ali-cn-hangzhou-sls-admin"); // Set LogStore configProps. Put (ConfigConstants.LOG_LOGSTORE,"sls_consumergroup_log"); // Set the start position of the consumer log service configProps. Put (ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, consts.log_END_CURSOR); / / set the log service message deserialization methods RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer (); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<RawLogGroupList>logTestStream = env.addSource(
        new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps));Copy the code

Here is a simple consumption example, we use java.util.properties as the configuration tool, and all of the configuration for Consumer is available in ConfigConstants.

Note that the number of subtasks in Flink Stream is independent of the number of shards in LogStore. If the number of shards is greater than the number of subtasks, each subtask does not repeatedly consume multiple shards.

Some of the tasks will be idle until a new shard is created.

1.2 Set the starting position of consumption

Flink log consumer supports setting the starting position of shard consumption. By setting the ConfigConstants.LOG_CONSUMER_BEGIN_POSITION property, you can customize consumption from the beginning and end of the shard or at a specific time. The values are as follows:

  • Consts.LOG_BEGIN_CURSOR: indicates consuming from the head of the shard, that is, consuming from the oldest data in the shard.
  • Consts.LOG_END_CURSOR: Indicates that the cursor is consumed from the end of the shard, that is, from the latest data in the shard.
  • UnixTimestamp: A string of integer values, expressed in seconds from 1970-01-01 to the present, meaning to consume data after this point in the SHard.

The three values are as follows:

ConfigProps. Put (ConfigConstants LOG_CONSUMER_BEGIN_POSITION, Consts. LOG_BEGIN_CURSOR); ConfigProps. Put (ConfigConstants LOG_CONSUMER_BEGIN_POSITION, Consts. LOG_END_CURSOR); ConfigProps. Put (ConfigConstants LOG_CONSUMER_BEGIN_POSITION,"1512439000");Copy the code

1.3 Monitoring: Consumption progress (optional)

Flink Log Consumer supports setting consumption progress monitoring. The so-called consumption progress is to obtain the real-time consumption position of each shard, which is represented by a timestamp. For details, please refer to the document consumption group – View status. [Consumer group – Monitoring alarm](help.aliyun.com/document_de… .

ConfigProps. Put (ConfigConstants LOG_CONSUMERGROUP,"your consumer group name”);Copy the code

Note that the above code is optional. If set, the Consumer will create the consumerGroup first. If it already exists, nothing will be done, and the snapshot in consumer will be automatically synchronized to the Logging service’s consumerGroup. Users can view the progress of consumer consumption in the logging service console.

1.4 Dr And Exactly Once Semantics support

When turning on Flink’s Checkpointing function, Flink log Consumer will periodically save the consumption progress of each shard. When the operation fails, Flink will restore the log consumer and start consumption from the latest checkpoint saved.

The checkpoint cycle defines the maximum amount of data to be backtracked (i.e., re-consumed) when a failure occurs, using the following code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); / / open the flink exactly once semantic env. GetCheckpointConfig () setCheckpointingMode (CheckpointingMode. EXACTLY_ONCE); // Save checkpoint env.enableCheckpointing(5000);Copy the code

Read the Official Flink Checkpoint document that I provide.

1.5 Supplementary Materials: Associated API and Permission Settings

The aliyun logging service interfaces used by Flink Log Consumer are as follows:

  • GetCursorOrData

    Used to pull data from the shard. Note that frequent calls to this interface may cause data to exceed the shard quota of the logging service. ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS and ConfigConstants.LOG_MAX_NUMBER_PER_FETCH control the interval between interface calls and the number of logs pulled per call. Shard quota reference articles/shard profile (https://help.aliyun.com/document_detail/28976.html).Copy the code
    ConfigProps. Put (ConfigConstants LOG_FETCH_DATA_INTERVAL_MILLIS,"100"); ConfigProps. Put (ConfigConstants LOG_MAX_NUMBER_PER_FETCH,"100");Copy the code
  • ListShards

    Used to getlogStore list of all shards, get shard status, etc. If your shard is frequently split and merged, you can adjust the call cycle of the interface to detect changes in the SHard.Copy the code
    Put (ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS,"30000");Copy the code
  • CreateConsumerGroup

    This interface call occurs only when the consumption progress monitoring is set, and the function is to create the consumerGroup, which is used to synchronize checkpoint.Copy the code
  • ConsumerGroupUpdateCheckPoint

    The interface user synchronizes Flink's snapshot to the Logging service's consumerGroup.Copy the code

Subusers need to authorize the following RAM policies to use Flink Log Consumer:



2. Log Producer

Note that producer only supports Flink at-least-once semantics, which means that in the event of a job failure, data written to the logging service may be duplicated, but never lost.

As an example, we write the string generated by the simulation to the logging service:

Class SimpleLogSerializer implements LogSerializationSchema<String> {public RawLogGroup serialize(String element) { RawLogGroup rlg = new RawLogGroup(); RawLog rl = new RawLog(); rl.setTime((int)(System.currentTimeMillis() / 1000)); rl.addContent("message", element); rlg.addLog(rl);return rlg;
    }
}
public class ProducerSample {
    public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
    public static String sAccessKeyId = "";
    public static String sAccessKey = "";
    public static String sProject = "ali-cn-hangzhou-sls-admin";
    public static String sLogstore = "test-flink-producer"; private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class); public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(3); DataStream<String> simpleStringStream = env.addSource(new EventsGenerator()); Properties configProps = new Properties(); // Set the domain name for accessing the log service configProps. Put (ConfigConstants.LOG_ENDPOINT, sEndpoint); // Set ak configProps. Put (ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId); ConfigProps. Put (ConfigConstants LOG_ACCESSKEY, sAccessKey); Put (ConfigConstants.LOG_PROJECT, sProject); // Set the logging service for writing logs. // Set the log service for log writinglogStore configProps. Put (ConfigConstants LOG_LOGSTORE, sLogstore); FlinkLogProducer<String>logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);

        simpleStringStream.addSink(logProducer);

        env.execute("flink log producer"); } public static class EventsGenerator implements SourceFunction<String> {private Boolean running =true;

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            long seq = 0;
            while (running) {
                Thread.sleep(10);
                ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
            }
        }

        @Override
        public void cancel() {
            running = false; }}}Copy the code

2.1 the initialization

There are two main things that Producer initialization needs to do:

  • This step is similar to Consumer. Producer has some customized parameters. In general, the default values are used, but in special scenarios, customization can be used:

    LOG_SENDER_IO_THREAD_COUNT // This value defines the time at which log data is cached to be sent, The default is 3000 configconstants.log_package_timeout_millis // The number of logs in the packet sent by the cache, The default is 4096 ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE The default is 3Mb ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE // The total memory that a job can use. The default is 100Mb ConfigConstants.LOG_MEM_POOL_BYTESCopy the code
    The preceding parameters are not mandatory. You can use the default values.Copy the code
  • Override LogSerializationSchema to define a method for serializing data into RawLogGroup.

    RawLogGroup islogA collection of the meaning of each field can be the reference document [] log data model (https://help.aliyun.com/document_detail/29054.html).Copy the code

If a user needs to use the shardHashKey function of the logging service to specify that data is written to a particular shard, the LogPartitioner can use the hashKey that generates the data as follows:

FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
logProducer. SetCustomPartitioner (new LogPartitioner < String > () {/ / generate 32-bithashElement value public String getHashKey (String) {try {MessageDigest md = MessageDigest. GetInstance ("MD5");
                    md.update(element.getBytes());
                    String hash = new BigInteger(1, md.digest()).toString(16);
                    while(hash.length() < 32) hash = "0" + hash;
                    return hash;
                } catch (NoSuchAlgorithmException e) {
                }
                return  "0000000000000000000000000000000000000000000000000000000000000000"; }});Copy the code

Note that the LogPartitioner is optional; if not set, data will be randomly written to a particular shard.

2.2 Permission Settings: RAM Policy

Producer writes data relying on the API of the logging service, as follows:

  • log:PostLogStoreLogs
  • log:ListShards

When a RAM subuser uses Producer, the two apis need to be authorized: