非同期処理でWebコンテンツをダウンロードする方法2 - Groovy, Scala, Java, C#

今回は、前回(id:fits:20111016)と同様の非同期ダウンロード処理を JavaC# で実装し、Groovy と Scala は別の実装方法を模索してみました。

使用した機能は以下の通りです。

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20111025/

Groovy の場合2 : Actor (GPars)

今回は GPars の Actor を使って実装してみました。
react を多段にして処理をつなげ、例外発生時は onException で一括処理するようにしています。((1) 〜 (4) の順に非同期処理される)

  • Groovy 1.8.2
async_download_web2.groovy
import groovyx.gpars.actor.*

def dir = args[0]

System.in.readLines() collect {u ->
    def download  = Actors.actor {
        def url

        //例外発生時の処理
        delegate.metaClass.onException = {
            println "failed: ${url}, ${it}"
        }

        react {urlString ->
            //URL接続開始 (2)
            url = new URL(urlString)
            //Actor(自分)へのメッセージ送信 (3)
            send url.openStream()

            react {stream ->
                //ダウンロード処理開始 (4)
                def file = new File(dir, new File(url.file).name)
                file.bytes = stream.bytes

                println "downloaded: ${url} => ${file}"
            }
        }
    }

    //Actor へのメッセージ送信 (1)
    download.send u
    download
} each {
    //処理の完了待ち
    it.join()
}

Scala の場合2 : 限定継続 + ops

前回(id:fits:20111016)の Actor 部分を ops.spawn で置き換えてみました。

  • Scala 2.9.1(JavaSE 7 依存)

前回と同様、実行時に -P:continuations:enable オプション指定が必要になります。

実行例(限定継続を有効化)
scala -P:continuations:enable async_download_web2.scala destdir < urls.txt
async_download_web2.scala
import scala.concurrent.ops
import scala.util.continuations._
import scala.io.Source

import java.io.{InputStream, File}
import java.net.URL
import java.nio.file.{Paths, Files, Path}
import java.nio.file.StandardCopyOption._

val dir = args(0)

val using = (st: InputStream) => (block: InputStream => Unit) => try {block(st)} finally {st.close()}

Source.stdin.getLines.toList.foreach {u =>
    val url = new URL(u)

    reset {
        //URL接続処理
        val stream = shift {k: (InputStream => Unit) =>
            //非同期実行
            ops.spawn {
                try {
                    //継続の呼び出し:(1) から処理を継続
                    k(url.openStream())
                }
                catch {
                    case e: Exception => printf("failed: %s, %s\n", url, e)
                }
            }
        }
        //(1)

        //ダウンロード処理
        val file = shift {k: (Path => Unit) =>
            //非同期実行
            ops.spawn {
                val f = new File(url.getFile()).getName()
                val filePath = Paths.get(dir, f)

                try {
                    using (stream) {st =>
                        Files.copy(st, filePath, REPLACE_EXISTING)
                    }
                    //継続の呼び出し:(2) から処理を継続
                    k(filePath)
                }
                catch {
                    case e: Exception => printf("failed: %s, %s\n", url, e)
                }
            }
        }
        //(2)

        printf("downloaded: %s => %s\n", url, file)
    }
}

Java の場合 : Concurrency Utilities

Concurrency Utilities の Future を使って実装してみました。
処理的には前回の Groovy 版に似ていると思います。

  • JavaSE 7.0u1
AsyncDownloadWeb.java
import java.io.*;
import java.util.*;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import static java.nio.file.StandardCopyOption.*;

public class AsyncDownloadWeb {
    public static void main(String[] args) throws Exception {

        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        ExecutorService exec = Executors.newCachedThreadPool();

        final String dir = args[0];
        String urlString = null;

        while ((urlString = reader.readLine()) != null) {
            final URI uri = URI.create(urlString);

            //URL接続処理の非同期実行
            final Future<InputStream> stream = exec.submit(new Callable<InputStream>() {
                public InputStream call() throws Exception {
                    return uri.toURL().openStream();
                }
            });

            //ダウンロード処理の非同期実行
            final Future<Path> file = exec.submit(new Callable<Path>() {
                public Path call() throws Exception {
                    String fileName = new File(uri.getPath()).getName();
                    Path filePath = Paths.get(dir, fileName);

                    try (InputStream in = stream.get()) {
                        Files.copy(in, filePath, REPLACE_EXISTING);
                    }
                    return filePath;
                }
            });

            //結果出力
            exec.submit(new Runnable() {
                public void run() {
                    try {
                        System.out.printf("downloaded: %s => %s\n", uri, file.get());
                    } catch (Exception ex) {
                        System.out.printf("failed: %s, %s\n", uri, ex);
                    }
                }
            });
        }

        //全処理の完了待ち
        exec.shutdown();
    }
}

C# の場合 : TPL + EAP

最初、Async CTP を使って F# と同じような処理を C# で実装するつもりだったのですが、Async CTP がまともにインストールできなかったので*1、タスク並列ライブラリ(TPL)と従来のイベントベース非同期パターン(EAP)を組み合わせて実装してみました。

TPL の TaskCompletionSource を使うと EAP の処理を Task 化できるので、下記サンプルでは DownloadFileAsync のイベント処理を Task 化し、処理の完了待ちに使用しています。(TrySetResult で設定した処理結果は特に使っていません)

AsyncDownloadWeb.cs
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading.Tasks;

public class AsyncDownloadWeb
{
    public static void Main(string[] args)
    {
        var urls = Console.In.ReadToEnd().Split(new string[]{Environment.NewLine}, StringSplitOptions.RemoveEmptyEntries);

        var dir = args[0];
        var taskList = new List<Task<bool>>(urls.Length);

        foreach (var u in urls)
        {
            var wc = new WebClient();
            var uri = new Uri(u);
            var fileName = Path.Combine(dir, Path.GetFileName(u));

            TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();

            //非同期ダウンロード完了時のイベント処理
            wc.DownloadFileCompleted += (sender, e) =>
            {
                if (e.Error != null)
                {
                    //エラー発生時の処理
                    Console.WriteLine("failed: {0}, {1}", uri, e.Error.Message);
                    //処理結果の設定
                    tcs.TrySetResult(false);
                }
                else
                {
                    //成功時の処理
                    Console.WriteLine("downloaded: {0} => {1}", uri, fileName);
                    //処理結果の設定
                    tcs.TrySetResult(true);
                }
            };

            //非同期ダウンロード開始
            wc.DownloadFileAsync(uri, fileName);
            //Task 化したオブジェクトを追加
            taskList.Add(tcs.Task);
        }

        //全処理の完了待ち
        Task.WaitAll(taskList.ToArray());
    }
}

*1:インストールは一応完了するが、必要なファイルが正常にインストールされなかった