非同期処理でWebコンテンツをダウンロードする方法2 - Groovy, Scala, Java, C#
今回は、前回(id:fits:20111016)と同様の非同期ダウンロード処理を Java と C# で実装し、Groovy と Scala は別の実装方法を模索してみました。
使用した機能は以下の通りです。
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20111025/
Groovy の場合2 : Actor (GPars)
今回は GPars の Actor を使って実装してみました。
react を多段にして処理をつなげ、例外発生時は onException で一括処理するようにしています。((1) 〜 (4) の順に非同期処理される)
- Groovy 1.8.2
async_download_web2.groovy
import groovyx.gpars.actor.* def dir = args[0] System.in.readLines() collect {u -> def download = Actors.actor { def url //例外発生時の処理 delegate.metaClass.onException = { println "failed: ${url}, ${it}" } react {urlString -> //URL接続開始 (2) url = new URL(urlString) //Actor(自分)へのメッセージ送信 (3) send url.openStream() react {stream -> //ダウンロード処理開始 (4) def file = new File(dir, new File(url.file).name) file.bytes = stream.bytes println "downloaded: ${url} => ${file}" } } } //Actor へのメッセージ送信 (1) download.send u download } each { //処理の完了待ち it.join() }
Scala の場合2 : 限定継続 + ops
前回(id:fits:20111016)の Actor 部分を ops.spawn で置き換えてみました。
- Scala 2.9.1(JavaSE 7 依存)
前回と同様、実行時に -P:continuations:enable オプション指定が必要になります。
実行例(限定継続を有効化)
scala -P:continuations:enable async_download_web2.scala destdir < urls.txt
async_download_web2.scala
import scala.concurrent.ops import scala.util.continuations._ import scala.io.Source import java.io.{InputStream, File} import java.net.URL import java.nio.file.{Paths, Files, Path} import java.nio.file.StandardCopyOption._ val dir = args(0) val using = (st: InputStream) => (block: InputStream => Unit) => try {block(st)} finally {st.close()} Source.stdin.getLines.toList.foreach {u => val url = new URL(u) reset { //URL接続処理 val stream = shift {k: (InputStream => Unit) => //非同期実行 ops.spawn { try { //継続の呼び出し:(1) から処理を継続 k(url.openStream()) } catch { case e: Exception => printf("failed: %s, %s\n", url, e) } } } //(1) //ダウンロード処理 val file = shift {k: (Path => Unit) => //非同期実行 ops.spawn { val f = new File(url.getFile()).getName() val filePath = Paths.get(dir, f) try { using (stream) {st => Files.copy(st, filePath, REPLACE_EXISTING) } //継続の呼び出し:(2) から処理を継続 k(filePath) } catch { case e: Exception => printf("failed: %s, %s\n", url, e) } } } //(2) printf("downloaded: %s => %s\n", url, file) } }
Java の場合 : Concurrency Utilities
Concurrency Utilities の Future を使って実装してみました。
処理的には前回の Groovy 版に似ていると思います。
- JavaSE 7.0u1
AsyncDownloadWeb.java
import java.io.*; import java.util.*; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import static java.nio.file.StandardCopyOption.*; public class AsyncDownloadWeb { public static void main(String[] args) throws Exception { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); ExecutorService exec = Executors.newCachedThreadPool(); final String dir = args[0]; String urlString = null; while ((urlString = reader.readLine()) != null) { final URI uri = URI.create(urlString); //URL接続処理の非同期実行 final Future<InputStream> stream = exec.submit(new Callable<InputStream>() { public InputStream call() throws Exception { return uri.toURL().openStream(); } }); //ダウンロード処理の非同期実行 final Future<Path> file = exec.submit(new Callable<Path>() { public Path call() throws Exception { String fileName = new File(uri.getPath()).getName(); Path filePath = Paths.get(dir, fileName); try (InputStream in = stream.get()) { Files.copy(in, filePath, REPLACE_EXISTING); } return filePath; } }); //結果出力 exec.submit(new Runnable() { public void run() { try { System.out.printf("downloaded: %s => %s\n", uri, file.get()); } catch (Exception ex) { System.out.printf("failed: %s, %s\n", uri, ex); } } }); } //全処理の完了待ち exec.shutdown(); } }
C# の場合 : TPL + EAP
最初、Async CTP を使って F# と同じような処理を C# で実装するつもりだったのですが、Async CTP がまともにインストールできなかったので*1、タスク並列ライブラリ(TPL)と従来のイベントベース非同期パターン(EAP)を組み合わせて実装してみました。
TPL の TaskCompletionSource を使うと EAP の処理を Task 化できるので、下記サンプルでは DownloadFileAsync のイベント処理を Task 化し、処理の完了待ちに使用しています。(TrySetResult で設定した処理結果は特に使っていません)
- C# 4.0 (.NET Framework 4.0)
AsyncDownloadWeb.cs
using System; using System.Collections.Generic; using System.IO; using System.Net; using System.Threading.Tasks; public class AsyncDownloadWeb { public static void Main(string[] args) { var urls = Console.In.ReadToEnd().Split(new string[]{Environment.NewLine}, StringSplitOptions.RemoveEmptyEntries); var dir = args[0]; var taskList = new List<Task<bool>>(urls.Length); foreach (var u in urls) { var wc = new WebClient(); var uri = new Uri(u); var fileName = Path.Combine(dir, Path.GetFileName(u)); TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>(); //非同期ダウンロード完了時のイベント処理 wc.DownloadFileCompleted += (sender, e) => { if (e.Error != null) { //エラー発生時の処理 Console.WriteLine("failed: {0}, {1}", uri, e.Error.Message); //処理結果の設定 tcs.TrySetResult(false); } else { //成功時の処理 Console.WriteLine("downloaded: {0} => {1}", uri, fileName); //処理結果の設定 tcs.TrySetResult(true); } }; //非同期ダウンロード開始 wc.DownloadFileAsync(uri, fileName); //Task 化したオブジェクトを追加 taskList.Add(tcs.Task); } //全処理の完了待ち Task.WaitAll(taskList.ToArray()); } }
*1:インストールは一応完了するが、必要なファイルが正常にインストールされなかった