First, the shared resource uses sleep() to observe data disorder
Java, consumer.java, and testDemo.java are all the same. They operate on shared Resource files
Resources. Java Shared resources
Public class Resource {private String name; private String gender; Public void push(String name, String gender) {this.name = name; // Let the producer call the member variable that sets the shared resource for the consumer to print. try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } this.gender = gender; Public void pop() {try {thread.sleep (100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.name + "-" + this.gender); }Copy the code
Java Producer. The Producer
public class Producer implements Runnable { public Resource resource = null; public Producer(Resource resource) { this.resource = resource; } @Override public void run() { for (int i = 0; i < 100; I++) {if (I % 2 = = 0) {resource. Push (" feng ", "female"); } else {resource. Push (" spring ", "male "); }}}Copy the code
Consumer. Java consumers
Public class Consumer implements Runnable {// Consumer implements Runnable public Resource = null; // Creatr Constructor public Consumer(Resource resource) { this.resource = resource; @override public void run() {for (int I = 0; i < 50; i++) { resource.pop(); }}Copy the code
Testdemo.java test code
Public class TestDemo {public static void main(String[] args) {// Create thread Resource Resource = new Resource(); new Thread(new Producer(resource)).start(); new Thread(new Consumer(resource)).start(); }Copy the code
Analysis results: Sister Phoenix – male sister Phoenix – female sister phoenix – male found gender disorder just began to print sister Phoenix – male producer Mr. Output spring brother – male, at this time the consumer did not consume, the producer continued to produce the name of sister Phoenix, at this time the consumer began to consume.
Second, use synchronous locks to avoid data clutter
Resources. Java Shared resources
Public class Resource{private String name; private String gender; Synchronized public void push(String name, String gender) {this.name = name; try{ Thread.sleep(100); }catch(InterruptedException e){ e.printStackTrace(); } this.gender = gender; Synchronized public void pop(){try{thread.sleep (100); synchronized public void pop(){try{thread.sleep (100); }catch(InterruptedException e){ e.printStackTrace(); } System.out.println(this.name + "-" +this.gender); }Copy the code
Gender disorder occurs.
- The solution: Just make sure that the process of producing names and genders is in sync and that no consumer thread can come in and fetch the data.
- Synchronization can be maintained using synchronized code blocks/synchronized methods /Lock mechanisms.
Three, how to realize the production of a data, consumption of a data.
-
Should appear alternately: spring elder brother – male -> fengjie – female -> spring elder brother – male -> fengjie – female…..
-
Solution: Use wait and wake up mechanisms.
-
Wait (): the thread executing this method releases the synchronization lock, and the JVM stores the thread in the wait pool for another thread to wake up. Notify: The thread executing this method wakes up any thread in the wait pool and transfers it to the wait pool. NotifyAll (): The thread executing this method wakes up all the threads waiting in the wait pool and transfers them to the wait pool. Note: the above method can only be synchronized to monitor lock object to invoke, otherwise an error IllegalMonitorStateException..
Resources. Java Shared resources
Public class Resource {private String name; private String gender; private boolean isEmpty = true; Synchronized public void push(String name, synchronized) synchronized public void push(String name, synchronized) String gender) { try { while (! IsEmpty) {this.wait(); this.wait(); this.wait(); } this.name = name; Thread.sleep(100); this.gender = gender; IsEmpty = false; // Set the shared resource object to null this.notify(); // Wake up a consumer} catch (InterruptedException e) {e.prinstackTrace (); Synchronized public void pop() {try {while (isEmpty) {// the current shared resource isEmpty waiting for the producer to produce // use the lock object to call this method This.wait (); this.wait(); } // consume thread.sleep (100); System.out.println(this.name + "-" + this.gender); // End of consumption isEmpty = true; // Wake up other threads this.notify(); } catch (InterruptedException e) { e.printStackTrace(); }}Copy the code
Fourth, thread communication – use Lock and Condition interfaces
Wait and notify method, can only be synchronous monitoring lock object to invoke, otherwise an error IllegalMonitorStateException. Now the problem is that the Lock mechanism has no synchronous Lock at all, and there is no concept of automatic Lock acquisition and automatic Lock release. Because there is no synchronization Lock, the Lock mechanism cannot call wait and notify. Solution :Java5 provides the Condition interface that handles the communication control of the Lock mechanism along with the Lock mechanism.
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; Public class Resource {private String name; private String gender; private boolean isEmpty = true; private final Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); Public void push(String name, String gender) {lock.lock(); try { while (! isEmpty) { condition.await(); } // start generating this.name = name; Thread.sleep(100); this.gender = gender; IsEmpty = false; condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); Public void pop() {lock.lock(); try { while (isEmpty) { condition.await(); } Thread.sleep(100); System.out.println(this.name + "-" + this.gender); // End of consumption isEmpty = true; condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); }}Copy the code
Fifth, the life cycle of the thread
-
Thread state
-
Is a
-
Claim 2
Some people call the blocking state, the waiting state, the timing waiting state together as the blocking state.
The State of Thread objects is stored in the inner class of Thread:
Note that the Thread.State class is actually an enumeration class. Because thread objects have a fixed state of only six, enumeration is the most appropriate representation.
-
1: New state (new): Use new to create a thread object, just allocate memory in the heap, before calling the start method. In the new state, the thread is not started at all, only a thread object exists. Thread t = new Thread(); // t is in the new state
When a thread object in the new state calls the start method, it goes from the new state to the runnable state. Thread object’s start method can only be called once, otherwise an error: IllegalThreadStateException.
-
2: Runnable state: it can be divided into two states, ready and running. Indicates the ready state and running state respectively. Ready: after calling the start method, the thread object waits for the JVM to schedule it (while the thread is not running). Running state: Thread objects get JVM scheduling, allowing multiple threads to run in parallel if there are multiple cpus.
-
3. Blocked: A running thread is blocked when it temporarily stops running because it has abandoned the CPU for some reason. The JVM does not allocate CPU to the thread until it re-enters the ready state and has a chance to return to the running state. The blocked state can only enter the ready state first, but cannot enter the running state directly. Two cases of blocking:
-
1): Thread A tries to acquire the synchronization lock while thread A is running, but thread B acquires it. At this point, the JVM stores the current THREAD A into the lock pool of the object, and the thread A enters the blocking state.
-
2): When the thread is running and makes an IO request, it enters the blocking state.
-
Waiting (waiting state can only be waked up by another thread): the wait method with no arguments,
- 1): When a thread is running and the wait() method is called, the JVM stores the current thread in the object wait pool.
-
Timed waiting (wait method with parameters or sleep method is used
-
Terminated state: normally called dead, it indicates that the thread is terminated.
-
1): Exit after executing the run method normally (normal death).
-
2): Exit when encountering an exception (after the exception occurs, the program will be interrupted)(accidental death).
Threads when you stop, can’t again to restart starts, otherwise an error (IllegalThreadStateException).
Obsolete method in Thread (deprecated due to thread-safety concerns): void suspend() : suspends the current Thread void resume() : resumes the current Thread void stop() : terminates the current Thread
Six, joint thread:
A thread’s join method means that a thread waits for another thread to complete before executing. After the join method is called, the thread object is blocked. Some people also call this method joint threading, that is, the current thread and the current thread of the thread into one thread.
class Join extends Thread{ public void run(){ for(int i=0; i<50; i++){ System.out.println("join:"+i); Public class UniteThread {public static void main(String[] args) throws Exception { System.out.println("begin....." ); Join joinThread = new Join(); for(int i=0; i<50; i++){ System.out.println("main:"+i); If (I ==10){// Start the join thread joinThread.start(); } if(I ==20){// Force the thread to joinThread.join(); } } System.out.println("end"); }}Copy the code
Seven, background threads
Background thread: A thread that runs in the background to provide services to other threads, also known as a “daemon thread”. JVM garbage collection threads are typically background threads. Features: If all foreground threads die, the background thread automatically dies, the foreground thread does not end, the background thread will not end. To test whether a thread object is a background thread, use thread.isdaemon (). The thread created by the foreground thread is the foreground thread by default, can be set to background thread with the setDaenon(true) method, and the new thread is background thread if and only if the background thread creates a new thread. Set the background thread: thread. SetDaemon (true), the method must be before the start method calls, or abnormal IllegalThreadStateException.
public class DaemonThread extends Thread { public void run() { for (int i = 0; i < 100; i++) { System.out.println(super.getName() + "-" + i); } } public static void main(String[] args) { System.out.println(Thread.currentThread().isDaemon()); for (int i = 0; i < 50; i++) { System.out.println("main:" + i); if (i == 10) { DaemonThread t = new DaemonThread(); t.setDaemon(true); t.start(); }}}}Copy the code
Thread pool usage
// Executors.newCachedThreadPool(); / / create a buffer pool, the buffer pool size is an Integer. MAX_VALUE. / / Executors newSingleThreadExecutor (); / / create a capacity of 1 buffer pool / / Executors newFixedThreadPool (int); Class MyTask implements Runnable {public MyTask() {} @override public void run() {//do something}} ExecutorService executor = Executors.newFixedThreadPool(5) MyTask myTask = new MyTask(); executor.execute(myTask);Copy the code
As for the number of data submitted for a single time, of course, the smaller the number of data submitted for a single time, the faster, but the number of times will be more, the overall time will be longer. If the number of data submitted for a single time is too much, the execution will be very slow, so that it may fail. After several times of testing, it is acceptable to have several thousand to ten thousand data. Which thread pool to choose, fixed size, or infinite growth. What happens when the number of threads exceeds the limit? All of these thread pools throw exceptions. An experienced colleague would disdain to say that a blocked thread pool is basically more reasonable, for example, with a wait queue, which uses a blocked queue. The minor downside is that creating threads all the time doesn’t feel very logical either.
-
A thread pool with queues
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5)); Copy the code
Use producers and consumers to improve the program
Java Producer. The Producer
import java.util.concurrent.ArrayBlockingQueue; public class Producerlocal implements Runnable { ArrayBlockingQueue<String> queue; public Producerlocal(ArrayBlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { try { for (int i = 0; i < 1000; i++) { queue.put("s" + i); } } catch (InterruptedException e) { e.printStackTrace(); }}}Copy the code
Consumer. Java consumers
import java.util.concurrent.ArrayBlockingQueue; public class Consumerlocal implements Runnable { ArrayBlockingQueue<String> queue; public Consumerlocal(ArrayBlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { while (true) { try { final String take = queue.take(); if ("poisonpill".equals(take)) { return; } //do something System.out.println(take); } catch (InterruptedException e) { e.printStackTrace(); }}}Copy the code
The main the main program
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws InterruptedException { int threadNum = Runtime.getRuntime().availableProcessors() * 2; ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(100); ExecutorService executor = Executors.newFixedThreadPool(5); for (int i = 0; i < threadNum; i++) { executor.execute(new Consumerlocal(queue)); } Thread pt = new Thread(new Producerlocal(queue)); pt.start(); pt.join(); for (int i = 0; i < threadNum; i++) { queue.put("poisonpill"); } executor.shutdown(); executor.awaitTermination(10L, TimeUnit.DAYS); }}Copy the code
Procedure to use a blocking queue, queue set a certain size, join the queue will block over the number of empty queue value will block, interested students can view the JDK source code. The number of consumer threads is twice that of the CPU, and the use of these classes requires reading manuals and writing test code. There is also a little trickery about when to terminate a thread, adding a sufficient number of poison pills.
With the code using the new pattern, the program is significantly speeded up, and at this point the producer-consumer pattern is basically over. If the next time you think of a program that needs multithreading and fits this pattern, it’s a good idea to apply it. All you can do now, of course, is roll up your sleeves and write some test code to get a feel for this pattern.
Because most of the application’s time is still spent on HTTP requests, the application’s runtime is still unacceptable. I came up with the idea of using asynchronous IO to speed things up without blocking HTTP. However, the problem is that the HTTP client has been modified for security authentication, including encryption authentication and single sign-on. It is difficult for the new client to adapt, and it will take some time. I am still afraid of failure. Asynchronous non-blocking IO, given the previous experience of data result selection, non-blocking is not necessarily good! In fact, I don’t know how to use it in multithreading, and I don’t know what the effect is.
Maven rely on
< the dependency > < groupId > org. Apache. Kafka < / groupId > < artifactId > kafka_2. 11 < / artifactId > < version > 0.10.2.0 < / version > </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpasyncclient</artifactId> The < version > 4.1.3 < / version > < / dependency >Copy the code
Asynchronous HTTP
/*
* ===============================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* =============================================== *
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package com.github.yfor.bigdata.tdg;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpPipeliningClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.client.methods.AsyncCharConsumer;
import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
import org.apache.http.protocol.HttpContext;
import java.io.IOException;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
/**
* This example demonstrates a pipelinfed execution of multiple HTTP request / response exchanges
* with a full content streaming.
*/
public class MainPhttpasyncclient {
public static void main(final String[] args) throws Exception {
CloseableHttpPipeliningClient httpclient = HttpAsyncClients.createPipelining();
try {
httpclient.start();
HttpHost targetHost = new HttpHost("httpbin.org", 80);
HttpGet[] resquests = {
new HttpGet("/"),
new HttpGet("/ip"),
new HttpGet("/headers"),
new HttpGet("/get")
};
List<MyRequestProducer> requestProducers = new ArrayList<MyRequestProducer>();
List<MyResponseConsumer> responseConsumers = new ArrayList<MyResponseConsumer>();
for (HttpGet request : resquests) {
requestProducers.add(new MyRequestProducer(targetHost, request));
responseConsumers.add(new MyResponseConsumer(request));
}
Future<List<Boolean>> future = httpclient.execute(
targetHost, requestProducers, responseConsumers, null);
future.get();
System.out.println("Shutting down");
} finally {
httpclient.close();
}
System.out.println("Done");
}
static class MyRequestProducer extends BasicAsyncRequestProducer {
private final HttpRequest request;
MyRequestProducer(final HttpHost target, final HttpRequest request) {
super(target, request);
this.request = request;
}
@Override
public void requestCompleted(final HttpContext context) {
super.requestCompleted(context);
System.out.println();
System.out.println("Request sent: " + this.request.getRequestLine());
System.out.println("=================================================");
}
}
static class MyResponseConsumer extends AsyncCharConsumer<Boolean> {
private final HttpRequest request;
MyResponseConsumer(final HttpRequest request) {
this.request = request;
}
@Override
protected void onResponseReceived(final HttpResponse response) {
System.out.println();
System.out.println("Response received: " + response.getStatusLine() + " -> " + this.request.getRequestLine());
System.out.println("=================================================");
}
@Override
protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException {
while (buf.hasRemaining()) {
buf.get();
}
}
@Override
protected void releaseResources() {
}
@Override
protected Boolean buildResult(final HttpContext context) {
System.out.println();
System.out.println("=================================");
System.out.println();
return Boolean.TRUE;
}
}
}
Copy the code
configuration
package com.github.yfor.bigdata.tdg;
public interface KafkaProperties {
final static String zkConnect = "localhost:2181";
final static String groupId = "group21";
final static String topic = "topic4";
final static String kafkaServerURL = "localhost";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 20000;
final static int reconnectInterval = 10000;
final static String clientId = "SimpleConsumerDemoClient";
}
Copy the code
Kafka takes some time to configure. Read the official documentation to get it up and running.
Producer thread
package com.github.yfor.bigdata.tdg; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; public class Producer extends Thread { private final KafkaProducer<Integer, String> producer; private final String topic; private final Boolean isAsync; private final int size; public Producer(String topic) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093"); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "DemoProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<Integer, String>(props); this.topic = topic; this.isAsync = true; this.size = producer.partitionsFor(topic).size(); } @Override public void run() { int messageNo = 1; while (messageNo < 100) { try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); If (isAsync) {// Send asynchronously ProducerRecord<>(topic, messageNo % size, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); } else {// Send synchronously try {producer. Send (new ProducerRecord<>(topic, messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } ++messageNo; } } } class DemoCallBack implements Callback { private final long startTime; private final int key; private final String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } @Override public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata ! = null) { System.out.println( "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); }}}Copy the code
Consumer thread
package com.github.yfor.bigdata.tdg; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class KafkaConsumer extends Thread { private final ConsumerConnector consumer; private final String topic; private final int size; public KafkaConsumer(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig()); this.topic = topic; this.size = 5; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.zkConnect); props.put("group.id", KafkaProperties.groupId); props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } @Override public void run() { try { sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(size)); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); ExecutorService executor = Executors.newFixedThreadPool(size); for (final KafkaStream stream : streams) { executor.submit(new KafkaConsumerThread(stream)); } } } class KafkaConsumerThread implements Runnable { private KafkaStream<byte[], byte[]> stream; public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) { this.stream = stream; } public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> mam = it.next(); System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "]," + "offset[" + mam.offset() + "], " + new String(mam.message())); }}}Copy the code