Java并发编程 - Callable、Future和FutureTask的达成
发布时间:2021-12-05 13:12:10 所属栏目:教程 来源:互联网
导读:启动线程执行任务,如果需要在任务执行完毕之后得到任务执行结果,可以使用从Java 1.5开始提供的Callable和Future 下面就分析一下Callable、Future以及FutureTask的具体实现及使用方法 源码分析基于JDK 1.7 一、Callable 与 Runnable java.lang.Runnable是一
boolean queued = false; for (;;) { //如果主线程已经被中断,removeWaiter(),并上抛InterruptedException //注意:Thread.interrupted()后会导致线程的中断状态为false if (Thread.interrupted()) { removeWaiter(q); //线程被中断的情况下,从waiters链表中删除q throw new InterruptedException(); } int s = state; //如果任务已经完成(可能是正常完成、异常、中断),直接返回,即还没有开始等待,任务已经完成了 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //如果任务正在完成,让出CPU资源,等待state变成NORMAL或EXCEPTIONAL else if (s == COMPLETING) // cannot time out yet Thread.yield(); //s<COMPLETING 且 还没有创建WaitNode else if (q == null) q = new WaitNode(); //s<COMPLETING 且 已经创建WaitNode,但还没有入队 else if (!queued) /** * 1、将当前waiters赋值给q.next,即“q-->当前waiters” * 2、CAS,将waiters属性,从“当前waiters-->q” * 所以后等待的会排在链表的前面,而任务完成时会从链表前面开始依次唤醒等待线程 */ queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //所有准备工作完成,判断等待是否需要计时 else if (timed) { nanos = deadline - System.nanoTime(); //如果已经等待超时,remove当前WaiterNode if (nanos <= 0L) { removeWaiter(q); //等待超时的情况下,从waiters链表中删除q return state; } LockSupport.parkNanos(this, nanos); //挂起一段时间 } else LockSupport.park(this); //一直挂起,等待唤醒 } } 1、判断主线程是否被中断,如果被中断,将当前WaitNode节点从waiters链表中删除,并上抛InterruptedException 2、如果任务已经完成(可能是正常完成、异常、中断),直接返回(即还没有开始等待,任务已经完成了,就返回了) 3、如果任务正在完成,让出CPU资源,等待state变成NORMAL或EXCEPTIONAL 4、如果任务没有被中断,也没有完成,new WaitNode() 5、如果任务没有被中断,也没有完成,也创建了WaitNode,使用UNSAFE.CAS()操作将WaitNode加入waiters链表 6、所有准备工作完毕,通过LockSupport的park或parkNanos挂起线程 而WaitNode就是一个简单的链表节点,记录这等待的线程和下一个WaitNode /** * Simple linked list nodes to record waiting threads in a Treiber * stack. See other classes such as Phaser and SynchronousQueue * for more detailed explanation. */ static final class WaitNode { volatile Thread thread; //等待的线程 volatile WaitNode next; //下一个WaitNode WaitNode() { thread = Thread.currentThread(); } } FutureTask.cancel()的实现 public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; if (mayInterruptIfRunning) { if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); //中断线程 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; } 1、如果任务不是运行状态,直接返回false失败 2、如果mayInterruptIfRunning==true,中断运行中的任务,使用CAS操作将状态NEW-->INTERRUPTING,再调用runner.interrupt(),最后将状态置为INTERRUPTED 3、如果mayInterruptIfRunning==false,将任务置为CANCELLED取消状态 4、调用finishCompletion()依次唤醒等待获取结果的线程,返回true取消成功 四、使用示例 import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestFuture { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(1); Task task = new Task(); //callable任务 Future<Integer> result = executor.submit(task); executor.shutdown(); (编辑:宿州站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
推荐文章
站长推荐