加入收藏 | 设为首页 | 会员中心 | 我要投稿 宿州站长网 (https://www.0557zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Java并发编程 - Callable、Future和FutureTask的达成

发布时间:2021-12-05 13:12:10 所属栏目:教程 来源:互联网
导读:启动线程执行任务,如果需要在任务执行完毕之后得到任务执行结果,可以使用从Java 1.5开始提供的Callable和Future 下面就分析一下Callable、Future以及FutureTask的具体实现及使用方法 源码分析基于JDK 1.7 一、Callable 与 Runnable java.lang.Runnable是一

    boolean queued = false;
    for (;;) {
        //如果主线程已经被中断,removeWaiter(),并上抛InterruptedException
        //注意:Thread.interrupted()后会导致线程的中断状态为false
        if (Thread.interrupted()) {
            removeWaiter(q); //线程被中断的情况下,从waiters链表中删除q
            throw new InterruptedException();
        }
 
        int s = state;
        //如果任务已经完成(可能是正常完成、异常、中断),直接返回,即还没有开始等待,任务已经完成了
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        //如果任务正在完成,让出CPU资源,等待state变成NORMAL或EXCEPTIONAL
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //s<COMPLETING 且 还没有创建WaitNode
        else if (q == null)
            q = new WaitNode();
        //s<COMPLETING 且 已经创建WaitNode,但还没有入队
        else if (!queued)
            /**
             * 1、将当前waiters赋值给q.next,即“q-->当前waiters”
             * 2、CAS,将waiters属性,从“当前waiters-->q”
             * 所以后等待的会排在链表的前面,而任务完成时会从链表前面开始依次唤醒等待线程
             */
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        //所有准备工作完成,判断等待是否需要计时
        else if (timed) {
            nanos = deadline - System.nanoTime();
            //如果已经等待超时,remove当前WaiterNode
            if (nanos <= 0L) {
                removeWaiter(q); //等待超时的情况下,从waiters链表中删除q
                return state;
            }
            LockSupport.parkNanos(this, nanos); //挂起一段时间
        }
        else
            LockSupport.park(this); //一直挂起,等待唤醒
    }
}
1、判断主线程是否被中断,如果被中断,将当前WaitNode节点从waiters链表中删除,并上抛InterruptedException
2、如果任务已经完成(可能是正常完成、异常、中断),直接返回(即还没有开始等待,任务已经完成了,就返回了)
3、如果任务正在完成,让出CPU资源,等待state变成NORMAL或EXCEPTIONAL
4、如果任务没有被中断,也没有完成,new WaitNode()
5、如果任务没有被中断,也没有完成,也创建了WaitNode,使用UNSAFE.CAS()操作将WaitNode加入waiters链表
6、所有准备工作完毕,通过LockSupport的park或parkNanos挂起线程
 
 
 
而WaitNode就是一个简单的链表节点,记录这等待的线程和下一个WaitNode
 
/**
 * Simple linked list nodes to record waiting threads in a Treiber
 * stack.  See other classes such as Phaser and SynchronousQueue
 * for more detailed explanation.
 */
static final class WaitNode {
    volatile Thread thread; //等待的线程
    volatile WaitNode next; //下一个WaitNode
    WaitNode() { thread = Thread.currentThread(); }
}
 
 
FutureTask.cancel()的实现
public boolean cancel(boolean mayInterruptIfRunning) {
    if (state != NEW)
        return false;
    
    if (mayInterruptIfRunning) {
        if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
            return false;
        
        Thread t = runner;
        if (t != null)
            t.interrupt(); //中断线程
        
        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    }
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
        return false;
    
    finishCompletion();
    
    return true;
}
1、如果任务不是运行状态,直接返回false失败
2、如果mayInterruptIfRunning==true,中断运行中的任务,使用CAS操作将状态NEW-->INTERRUPTING,再调用runner.interrupt(),最后将状态置为INTERRUPTED
3、如果mayInterruptIfRunning==false,将任务置为CANCELLED取消状态
4、调用finishCompletion()依次唤醒等待获取结果的线程,返回true取消成功
 
 
四、使用示例
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
public class TestFuture {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        Task task = new Task(); //callable任务
        Future<Integer> result = executor.submit(task);
        executor.shutdown();

(编辑:宿州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

推荐文章