Multithreaded implementation

Implementation threads are the foundation of the foundation in concurrent programming, because we have to implement threads before we can proceed with a series of operations.

Basic implementation

Runable

public class ImplementRunable implements Runnable {
    @Override
    public void run(a) {
        while (true) {
            // Prints the thread name, separate from the main thread name
            System.out.println(Thread.currentThread().getName());
            try {
                // The thread sleeps for one second
                Thread.sleep(1000);
            } catch (Exception e) {
                throw newRuntimeException(e); }}}public static void main(String[] args) {
        new Thread(newImplementRunable()).start(); }}Copy the code

It’s essentially a new Thread, passing in a Runable object as a constructor

Thread

The most common way to do this is by inheriting Thread classes or directly new Thread objects, but we know that Java is single-inherited, that is, if we inherit Thread classes, we cannot inherit other classes, so this is why we often use Runable to implement multithreading. At this point we can pass the Runable object to the Thread constructor. The following are common constructors for Thread

Let’s look at an example

public class ExtendsThread extends Thread {
    
    public ExtendsThread(a) {
        // Set the name of the current thread
        this.setName("MyThread");
    }

    @Override
    public void run(a) {
        // Outputs the name of the current thread every 1s
        while (true) {
            // Prints the thread name, separate from the main thread name
            System.out.println(Thread.currentThread().getName());
            try {
                // The thread sleeps for one second
                Thread.sleep(1000);
            } catch (Exception e) {
                throw newRuntimeException(e); }}}public static void main(String[] args) {
        newExtendsThread().start(); }}Copy the code

Callable

Runnable and Callable both represent tasks that are to be executed in different threads. Runnable has been available since JDK1.0, Callable was added in JDK1.5.

The main difference is that Callable’s call() method can return values and throw exceptions, while Runnable’s Run () method does not, and Callable can return Future objects loaded with computed results.

public class ImplementCallable implements Callable<Integer> {
    @Override
    public Integer call(a) throws Exception {
        return new Random().nextInt();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // Create a thread pool
        ExecutorService service = Executors.newFixedThreadPool(1);
        // Submit the task and return the result with a Future submission
        Future<Integer> future = service.submit(newImplementCallable()); Integer integer = future.get(); System.out.println(integer); }}Copy the code

Note, however, that Callable cannot be used with threads directly, but with Thread pools. We also demonstrated how to retrieve results from Future objects. In fact, we can also use Callable with FutureTask here, which we will demonstrate later when demonstrating FutureTask

TimerTask

public class TimerTaskDemo {
    /** * Prints: Hello world ** after 100ms delay@param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        Timer t = new Timer();
        t.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run(a) {
                System.out.println("hello world"); }},100.1000); }}Copy the code

This may seem like a new implementation, but when you look at the implementation of TimerTask, you’ll see that this class actually inherits from Runnable which means that this is actually how Runnable is implemented, Put the TimerTask into a TaskQueue, wait until it is scheduled, and then execute the TimerTask’s run method.

ScheduledExecutorService (ScheduledExecutorService, ScheduledExecutorService, ScheduledExecutorService, ScheduledExecutorService, ScheduledExecutorService, ScheduledExecutorService, ScheduledExecutorService

Let’s comb out a task scheduling process

  1. We created a Timer object

     public Timer(a) {
         this("Timer-" + serialNumber());
     }
     public Timer(String name) {
         thread.setName(name);
         thread.start();
    }
    Copy the code

    We see that the constructor starts a thread called TimerThread

  2. The scheduleAtFixedRate method is used to create and schedule a scheduled task, which is simply added to the TaskQueue

    public void scheduleAtFixedRate(TimerTask task, Date firstTime,
                                    long period) {
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, firstTime.getTime(), period);
    }
    private void sched(TimerTask task, long time, long period) {
        if (time < 0)
            throw new IllegalArgumentException("Illegal execution time.");
    
        // Constrain value of period sufficiently to prevent numeric
        // overflow while still being effectively infinitely large.
        if (Math.abs(period) > (Long.MAX_VALUE >> 1))
            period >>= 1;
    
        synchronized(queue) {
            if(! thread.newTasksMayBeScheduled)throw new IllegalStateException("Timer already cancelled.");
    
            synchronized(task.lock) {
                if(task.state ! = TimerTask.VIRGIN)throw new IllegalStateException(
                        "Task already scheduled or cancelled");
                task.nextExecutionTime = time;
                task.period = period;
                task.state = TimerTask.SCHEDULED;
            }
          	// Add the task to the queue.
            queue.add(task);
            if(queue.getMin() == task) queue.notify(); }}Copy the code
  3. The Timer implements its NB scheduling function, which we are not involved in, because it belongs to the black box and is not black in fact. Let’s take a look, the secret lies in the TimerThread, we know that the thread object was started when the Timer was created

    private TaskQueue queue;
    
    TimerThread(TaskQueue queue) {
        this.queue = queue;
    }
    // Thread entry, run method
    public void run(a) {
        try {
          	// Call the core method -- loop
            mainLoop();
        } finally {
            // Someone killed this Thread, behave as if Timer cancelled
            synchronized(queue) {
                newTasksMayBeScheduled = false;
                queue.clear();  // Eliminate obsolete references}}}/** * The main timer loop. (See class comment.) */
    private void mainLoop(a) {
        while (true) {
            try {
                TimerTask task;
                boolean taskFired;
                synchronized(queue) {
                    // Wait until the task queue is empty
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    if (queue.isEmpty())
                        break; // Queue is empty and will forever remain; die
    
                    // Queue nonempty; look at first evt and do the right thing
                    long currentTime, executionTime;
                  	// Get the task from the task queue
                    task = queue.getMin();
                    synchronized(task.lock) {
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0? currentTime - task.period : executionTime + task.period); }}}if(! taskFired)// Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);
                }
              	// To execute the task, we see that instead of starting a new thread, the execution is blocked
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }
    Copy the code

    So here we know that a Timer is a single thread that executes a timed task, which means that maybe your task is running out of time and still hasn’t been executed, because the last task hasn’t been executed yet, so let’s write an example here, which is the example above, and just change it

    public static void main(String[] args) throws InterruptedException {
        timer();
    }
    // In general we still want 1s execution
    public static void timer(a){
        Timer t = new Timer();
        t.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run(a) {
                System.out.println(Thread.currentThread().getName()+ ": hello world");
                // We waited in this task
                try {
                    TimeUnit.SECONDS.sleep(1000000);
                } catch(InterruptedException e) { e.printStackTrace(); }}},100.1000);
    }
    Copy the code

    In fact, once you’ve executed it, you can’t guarantee that your task TimerTask will execute once, because the last task hasn’t finished yet. This is why IDEA recommends ScheduledExecutorService, which is essentially a thread pool and will be covered when we learn about thread pool implementation.

FutureTask

In Java concurrent programs, FutureTask represents an asynchronous operation that can be cancelled. It has methods to start and cancel operations, to check whether an operation is complete, and to retrieve the result of an operation.

The result can only be retrieved when the operation is complete, and the GET method will block if the operation is not complete.

A FutureTask object can wrap Callable and Runnable objects, Since FutureTask also implements the Runnable interface, it can be submitted to executors for execution or used with threads. But there is a problem. We know that Runnable does not return a value. So let’s see

public class FutureTaskDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        callableDemo();
        runableDemo();
    }

    public static void callableDemo(a) throws ExecutionException, InterruptedException {
        Callable<Integer> call = new Callable<Integer>() {
            @Override
            public Integer call(a) throws Exception {
                System.out.println("Calculating the results...");
                Thread.sleep(3000);
                return 1; }}; FutureTask<Integer> task =new FutureTask<>(call);
        Thread thread = new Thread(task);
        thread.start();
        Integer result = task.get();
        System.out.println("The result of callableDemo is:" + result);
    }


    public static void runableDemo(a) throws ExecutionException, InterruptedException {
        Runnable run = new Runnable() {
            @SneakyThrows
            @Override
            public void run(a) {
                System.out.println("Calculating the results...");
                Thread.sleep(3000); }};// The return value is predefined by ourselves
        FutureTask<Integer> task = new FutureTask(run,1);
        Thread thread = new Thread(task);
        thread.start();
        Integer result = task.get();
        System.out.println("The result of runableDemo is:"+ result); }}Copy the code

In fact, we see that FutureTask essentially uses either Callable or Runnable

Advanced implementation

Java8 lambda expression

This is a syntactic sugar, and it’s essentially the same old thing, but let’s take a quick look at what this really is, which is essentially a functional interface

public class LambdaDemo {
    public static void main(String[] args) {
        lambdaThread();
        lambdaRunable();
    }


    public static void lambdaThread(a) {
        Thread t = new Thread(() -> {
            System.out.println("Implementation of lambdaThread");
        });
        t.start();
    }

    public static void lambdaRunable(a) {
        Runnable r = () -> {
            System.out.println("Implementation of lambdaRunable");
        };
        Thread t1 = new Thread(r);
        Thread t2 = newThread(() -> { r.run(); }); t1.start(); t2.start(); }}Copy the code

Java8 stream

This mainly uses the stream API in Java8

public class StreamDemo { public static void main(String[] args) { ,2,3,4,5,6,7,8,9,10 Stream of (1) the parallel (). The forEach (ele - > {System. Out. Println (Thread. The currentThread (). The getName () + ":" + ele); }); }}Copy the code

Output: We see multiple threads started

ForkJoinPool.commonPool-worker-1:3
ForkJoinPool.commonPool-worker-1:4
ForkJoinPool.commonPool-worker-5:5
ForkJoinPool.commonPool-worker-5:10
ForkJoinPool.commonPool-worker-4:1
ForkJoinPool.commonPool-worker-2:9
ForkJoinPool.commonPool-worker-3:2
ForkJoinPool.commonPool-worker-5:6
ForkJoinPool.commonPool-worker-1:8
main:7
Copy the code

The default number of threads used by concurrent streams is equal to the number of processor cores on your machine. By this method can modify this value, this is a global property, System. SetProperty (” java.util.concurrent.ForkJoinPool.com mon. Parallelism “, “12”);

The fork/join framework was introduced in jdk1.7, and it is based on this framework that java8 stream multithreading is not a stream. Therefore, to understand concurrent streams, you need to learn the fork/join framework. The purpose of the fork/ Join framework is to recursively split tasks that can be parallel into smaller tasks, and then combine the results of each subtask to produce an overall result. It is an implementation of the ExecutorService interface, which assigns subtasks to worker threads in a ForkJoinPool. To submit a task to this thread pool, you must create a subclass of RecursiveTask, or a subclass of RecursiveAction if the task does not return a result.

The thread pool

Instead of going into detail about thread pools, let’s talk about how thread pools implement multi-threading, that is, how thread pools create threads, and we know that the purpose of using thread pools is to avoid manually creating a large number of threads, and to give control to thread pools to achieve thread reuse.

Let’s first look at how we use thread pools

public class ThreadPoolDemo {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();

        while (true) {
            executorService.submit(() -> {
                while (true) {
                    System.out.println(Thread.currentThread().getName());
                    try {
                        TimeUnit.SECONDS.sleep(10);
                    } catch(InterruptedException e) { e.printStackTrace(); }}}); TimeUnit.SECONDS.sleep(1); }}}Copy the code

From here we can see that all we have to do is submit a Runnable object to the thread pool. As we can probably guess from the way we implemented threads with Runnable, the thread pool creates threads for us from the Runnable object we submitted. When we create a thread pool to actually have such a parameter, is used to create a thread factory, Executors, newCachedThreadPool newCachedThreadPool () method is just Java provides us with a convenient way, The following constructor will eventually be called.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
Copy the code

Now we can take a look at the factory

/** * The default thread factory */
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private finalString namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s ! =null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix ="pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
  	// Here is how we create the thread
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if(t.getPriority() ! = Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);returnt; }}Copy the code

For a thread pool, threads are essentially created through a thread factory. DefaultThreadFactory is used by default, which sets default values for threads created by the thread pool, such as the name of the thread, whether it is a daemon thread, and the priority of the thread. No matter how you set these properties, it still ends up creating threads through new Thread(), but the constructor here takes a few more arguments, so creating threads through a Thread pool is not a step away from the two basic ways of creating threads. Because it’s still essentially implemented through new Thread().

The start and state of a thread

Start method and run method

If you call run directly, it’s a synchronous call.

Many start

So let’s see what happens when we start multiple times

public class ThreadStartTimes {
    public static void main(String[] args) {
        Runnable target;
        Thread thread = new Thread(()->{
            System.out.println(Thread.currentThread().getName());
        });

        thread.start();
        System.out.println(1);
        thread.start();
        System.out.println(2);
        thread.start();
        System.out.println(3); }}Copy the code

The output is as follows:

1
Thread-0
Exception in thread "main" java.lang.IllegalThreadStateException
	at java.lang.Thread.start(Thread.java:708)
	at thread.thread.ThreadStartTimes.main(ThreadStartTimes.java:12)
Copy the code

We see an error, so let’s look at the implementation of this method

public synchronized void start(a) {
    /** * if our thread is in the NEW state, Buta pile cannot be called any more than A pile of facts. Buta pile cannot be called any more than A pile of facts
    if(threadStatus ! =0)
        throw new IllegalThreadStateException();

    /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */
    group.add(this);

    boolean started = false;
    try {
        start0();
        started = true;
    } finally {
        try {
            if(! started) { group.threadStartFailed(this); }}catch (Throwable ignore) {
            /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */}}}Copy the code

That is, if the threadStatus of our thread is not zero after the first call to start, an error will be reported if you call the method again. Note, however, that you do not see state changes in Thread code, meaning that state changes are maintained by the local method

Understand the relationship between threads and Runnable

It’s important to understand this, and that’s why thread pools exist. If we look at the following code, we see that the core logic of a thread is to call target.run(), and in this case target is runnable. Okay

@Override
public void run() {
    if (target != null) {
        target.run();
    }
}
Copy the code

Because normally our run method or our thread object holds only one runnable object, it is often a mistake to assume that runnable is functionally equivalent to thread. In fact, we can look at the following code, a core piece of thread pool code. We’ll explain this later when we learn about thread pools

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
      	// Here is the point
        while(task ! =null|| (task = getTask()) ! =null) {
            w.lock();
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally{ afterExecute(task, thrown); }}finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally{ processWorkerExit(w, completedAbruptly); }}Copy the code

We see that instead of simply holding a Runnable object and running it, and when it’s done, the thread is done, the way we do this is that the thread runs a Runnable object, gets another Runnable object from the queue, That is, one thread runs multiple Runnable objects, that’s how thread pools work, that’s how threads relate to Runnable

conclusion

  1. Both Callable and FutureTask are first and foremost, like Runnable, a task that needs to be executed, rather than threads themselves. They can be executed in a thread pool, as shown in the code. Submit () puts the task into a thread pool, and the thread pool creates the thread. Whatever method is used, the thread ultimately executes the task. Implementing the Runnable interface and inheriting the Thread class. Always remember the Thread is the Thread, the task is the task, the Thread is the Thread is Runnable task or something like that, only in this way can you understand the meaning of the Thread pool, that is to say, we give the task to the Thread pool, Thread pool with a pool of threads run our task, when a task to run out, this Thread can run other tasks, Thus achieving the reuse of threads.

  2. Callable, FutureTask, and Future were all introduced in JDK1.5, so there is essentially only one way to implement multithreading because other implementations must use the Thread class. So we can understand that other classes such as Callable, FutureTask, and Runnable are task objects that need to be put into threads to execute.

  3. Which is better, Thread or Runnable? Runnable

    1. First, because Java is single-inheritance only, we can use Runnable to implement functions, and our objects can inherit other classes as needed
    2. The second is that each creation of a Thread is an actual Thread overhead, so we can actually execute multiple runnables with a single Thread object, which is interesting. This is how Thread pools work.
    3. Finally, the design of the code, through Runnable to achieve multi-threading, to achieve the decoupling of Runnable and Thread class, Thread class is responsible for Thread startup and property Settings, Runnable encapsulates our business logic, This is why we can use threads to execute multiple Runnable objects, because they are positioned and designed differently. Threads are physical threads, and Runnable is business logic to execute on threads.