Java8并發教程:Threads和Executors

歡迎閱讀我的Java8并發教程的第一部分。這份指南將會以簡單易懂的代碼示例來教給你如何在Java8中進行并發編程。這是一系列教程中的第一部分。在接下來的15分鐘,你將會學會如何通過線程,任務(tasks)和 exector services來并行執行代碼。

  • 第一部分:Threads和Executors

  • 第二部分:同步和鎖

并發在Java5中首次被引入并在后續的版本中不斷得到增強。在這篇文章中介紹的大部分概念同樣適用于以前的Java版本。不過我的代碼示例聚焦于Java8,大量使用lambda表達式和其他新特性。如果你對lambda表達式不屬性,我推薦你首先閱讀我的Java 8 教程。

Threads 和 Runnables

所有的現代操作系統都通過進程和線程來支持并發。進程是通常彼此獨立運行的程序的實例,比如,如果你啟動了一個Java程序,操作系統產生一個新的進程,與其他程序一起并行執行。在這些進程的內部,我們使用線程并發執行代碼,因此,我們可以最大限度的利用CPU可用的核心(core)。

Java從JDK1.0開始執行線程。在開始一個新的線程之前,你必須指定由這個線程執行的代碼,通常稱為task。這可以通過實現Runnable——一個定義了一個無返回值無參數的run()方法的函數接口,如下面的代碼所示:

Runnable task = () -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
};

task.run();

Thread thread = new Thread(task);
thread.start();

System.out.println("Done!");

因為Runnable是一個函數接口,所以我們利用lambda表達式將當前的線程名打印到控制臺。首先,在開始一個線程前我們在主線程中直接運行runnable。

控制臺輸出的結果可能像下面這樣:

Hello main
Hello Thread-0
Done!

或者這樣:

Hello main
Done!
Hello Thread-0

由于我們不能預測這個runnable是在打印’done’前執行還是在之后執行。順序是不確定的,因此在大的程序中編寫并發程序是一個復雜的任務。

我們可以將線程休眠確定的時間。在這篇文章接下來的代碼示例中我們可以通過這種方法來模擬長時間運行的任務。

Runnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Foo " + name);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Bar " + name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(runnable);
thread.start();

當你運行上面的代碼時,你會注意到在第一條打印語句和第二條打印語句之間存在一分鐘的延遲。TimeUnit在處理單位時間時一個有用的枚舉類。你可以通過調用Thread.sleep(1000)來達到同樣的目的。

使用Thread類是很單調的且容易出錯。由于并發API在2004年Java5發布的時候才被引入。這些API位于java.util.concurrent包下,包含很多處理并發編程的有用的類。自從這些并發API引入以來,在隨后的新的Java版本發布過程中得到不斷的增強,甚至Java8提供了新的類和方法來處理并發。

接下來,讓我們走進并發API中最重要的一部——executor services。

Executors

并發API引入了ExecutorService作為一個在程序中直接使用Thread的高層次的替換方案。Executos支持運行異步任務,通常管理一個線程池,這樣一來我們就不需要手動去創建新的線程。在不斷地處理任務的過程中,線程池內部線程將會得到復用,因此,在我們可以使用一個executor service來運行和我們想在我們整個程序中執行的一樣多的并發任務。

下面是使用executors的第一個代碼示例:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);

});

// => Hello pool-1-thread-1

Executors類提供了便利的工廠方法來創建不同類型的 executor services。在這個示例中我們使用了一個單線程線程池的 executor。

代碼運行的結果類似于上一個示例,但是當運行代碼時,你會注意到一個很大的差別:Java進程從沒有停止!Executors必須顯式的停止-否則它們將持續監聽新的任務。

ExecutorService提供了兩個方法來達到這個目的——shutdwon()會等待正在執行的任務執行完而shutdownNow()會終止所有正在執行的任務并立即關閉execuotr。

這是我喜歡的通常關閉executors的方式:

try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
    }
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}

executor通過等待指定的時間讓當前執行的任務終止來“溫柔的”關閉executor。在等待最長5分鐘的時間后,execuote最終會通過中斷所有的正在執行的任務關閉。

Callables 和 Futures

除了Runnable,executor還支持另一種類型的任務——Callable。Callables也是類似于runnables的函數接口,不同之處在于,Callable返回一個值。

下面的lambda表達式定義了一個callable:在休眠一分鐘后返回一個整數。

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};

Callbale也可以像runnbales一樣提交給 executor services。但是callables的結果怎么辦?因為submit()不會等待任務完成,executor service不能直接返回callable的結果。不過,executor 可以返回一個Future類型的結果,它可以用來在稍后某個時間取出實際的結果。

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);

在將callable提交給exector之后,我們先通過調用isDone()來檢查這個future是否已經完成執行。我十分確定這會發生什么,因為在返回那個整數之前callable會休眠一分鐘、

在調用get()方法時,當前線程會阻塞等待,直到callable在返回實際的結果123之前執行完成。現在future執行完畢,我們可以在控制臺看到如下的結果:

future done? false
future done? true
result: 123

Future與底層的executor service緊密的結合在一起。記住,如果你關閉executor,所有的未中止的future都會拋出異常。

executor.shutdownNow();
future.get();

你可能注意到我們這次創建executor的方式與上一個例子稍有不同。我們使用newFixedThreadPool(1)來創建一個單線程線程池的 execuot service。 這等同于使用newSingleThreadExecutor不過使用第二種方式我們可以稍后通過簡單的傳入一個比1大的值來增加線程池的大小。

Timeouts

任何future.get()調用都會阻塞,然后等待直到callable中止。在最糟糕的情況下,一個callable持續運行——因此使你的程序將沒有響應。我們可以簡單的傳入一個時長來避免這種情況。

ExecutorService executor = Executors.newFixedThreadPool(1);

    Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});

    future.get(1, TimeUnit.SECONDS);

運行上面的代碼將會產生一個TimeoutException

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)

你可能已經猜到為什么會拋出這個異常。我們指定的最長等待時間為1分鐘,而這個callable在返回結果之前實際需要兩分鐘。

invokeAll

Executors支持通過invokeAll()一次批量提交多個callable。這個方法結果一個callable的集合,然后返回一個future的列表。

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);

在這個例子中,我們利用Java8中的函數流(stream)來處理invokeAll()調用返回的所有future。我們首先將每一個future映射到它的返回值,然后將每個值打印到控制臺。如果你還不屬性stream,可以閱讀我的Java8 Stream 教程。

invokeAny

批量提交callable的另一種方式就是invokeAny(),它的工作方式與invokeAll()稍有不同。在等待future對象的過程中,這個方法將會阻塞直到第一個callable中止然后返回這一個callable的結果。

為了測試這種行為,我們利用這個幫助方法來模擬不同執行時間的callable。這個方法返回一個callable,這個callable休眠指定 的時間直到返回給定的結果。

Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}

 

我們利用這個方法創建一組callable,這些callable擁有不同的執行時間,從1分鐘到3分鐘。通過invokeAny()將這些callable提交給一個executor,返回最快的callable的字符串結果-在這個例子中為任務2:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2

上面這個例子又使用了另一種方式來創建executor——調用newWorkStealingPool()。這個工廠方法是Java8引入的,返回一個ForkJoinPool類型的 executor,它的工作方法與其他常見的execuotr稍有不同。與使用一個固定大小的線程池不同,ForkJoinPools使用一個并行因子數來創建,默認值為主機CPU的可用核心數。

ForkJoinPools 在Java7時引入,將會在這個系列后面的教程中詳細講解。讓我們深入了解一下 scheduled executors 來結束本次教程。

Scheduled Executors

我們已經學習了如何在一個 executor 中提交和運行一次任務。為了持續的多次執行常見的任務,我們可以利用調度線程池。

ScheduledExecutorService支持任務調度,持續執行或者延遲一段時間后執行。

下面的實例,調度一個任務在延遲3分鐘后執行:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);

調度一個任務將會產生一個專門的future類型——ScheduleFuture,它除了提供了Future的所有方法之外,他還提供了getDelay()方法來獲得剩余的延遲。在延遲消逝后,任務將會并發執行。

為了調度任務持續的執行,executors 提供了兩個方法scheduleAtFixedRate()scheduleWithFixedDelay()。第一個方法用來以固定頻率來執行一個任務,比如,下面這個示例中,每分鐘一次:

ScheduledExecutorService executor =     Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

另外,這個方法還接收一個初始化延遲,用來指定這個任務首次被執行等待的時長。

請記?。?code>scheduleAtFixedRate()并不考慮任務的實際用時。所以,如果你指定了一個period為1分鐘而任務需要執行2分鐘,那么線程池為了性能會更快的執行。

在這種情況下,你應該考慮使用scheduleWithFixedDelay()。這個方法的工作方式與上我們上面描述的類似。不同之處在于等待時間 period 的應用是在一次任務的結束和下一個任務的開始之間。例如:

ScheduledExecutorService executor =         Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

這個例子調度了一個任務,并在一次執行的結束和下一次執行的開始之間設置了一個1分鐘的固定延遲。初始化延遲為0,任務執行時間為0。所以我們分別在0s,3s,6s,9s等間隔處結束一次執行。如你所見,scheduleWithFixedDelay()在你不能預測調度任務的執行時長時是很有用的。

譯者:張坤

上一篇: 高階函數和Java的Lambda

下一篇: MyEclipse及Java基礎增強

分享到: 更多
重庆时时全天计划单期 麻将胡牌牌型大全集图 正常牌怎么看生死门 博士江西时时软件 做啥生意稳赚不赔 飞艇计划全天免费软件网页版 时时彩平投1:1盈利技巧 双色球杀红绝招超准 竟采比分网 北京pk走势图 手机重庆计划软件免费版 心水 新时时和老时时有什么区别 欧洲百万彩计划软件 十三幺怎么胡 安徽时时计划软件手机版式