非同期処理でWebコンテンツをダウンロードする方法 - Groovy, Scala, F#

前回(id:fits:20110925)、並列コレクション等で Web コンテンツをダウンロードする処理を実装してみましたが、今回はその非同期処理版を Groovy, Scala, F# で実装してみました。(主な仕様は前回と同じ)

実行例
groovy async_download_web.groovy destdir < urls.txt

今回、非同期処理のために使用した機能は以下の通りです。

  • Groovy : GPars
  • Scala : 限定継続 + Actor
  • F# : 非同期ワークフロー

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20111016/

Groovy の場合: GPars

非同期処理でも GPars が使えます。

GParsPool.withPool や GParsExecutorsPool.withPool 内*1で、クロージャに対して async() を実行すると非同期化されたクロージャを取得できます。

非同期化されたクロージャを実行すると Future が返り、Future に対して get() を呼び出す事で処理結果を取得します。(ただし、処理が完了していないと完了するまで待ちになる)
非同期化されたクロージャ内で発生した例外も get() 実行時に throw されます。


今回は指定の URL に接続する処理とダウンロードしてローカルファイルを作成する処理を非同期化し、全 URL を非同期で処理した後にループで 1件ずつ完了待ちするようにしてみました。
ダウンロードを完了したものからでは無く、1件ずつ順番に処理結果待ちと文字列出力("downloaded: ・・・" の出力)を行っているため、ダウンロード処理の遅いものが途中にあると文字列出力が停滞してしまう点は改善が必要かと思います。(ちなみに、each の代わりに eachParallel にすると URL 接続に失敗する等で "failed: ・・・" が出力された際にスクリプトが終わらなくなる等の不都合がありました)

また、GParsExecutorsPool よりも GParsPool の方が速く感じたので、今回は GParsPool を使っています。(withPool も数値を指定した方が速かった)

  • Groovy 1.8.2
async_download_web.groovy
import groovyx.gpars.*

def dir = args[0]

GParsPool.withPool(10) {
    //非同期化したURL接続処理
    def openUrl = { it.newInputStream() }.async()
    //非同期化したダウンロード処理
    def downloadUrl = { f, ou -> f.bytes = ou.get().bytes }.async()

    //全 URL に対して非同期処理を実行
    System.in.readLines() collect {
        def url = new URL(it)
        def file = new File(dir, new File(url.file).name)

        //downloadUrl や openUrl は Future を直ぐに返し、
        //非同期的に実行される
        [url: url, file: file, result: downloadUrl(file, openUrl(url))]

    } each {
        try {
            //1件ずつ処理の完了待ち
            it.result.get()
            println "downloaded: ${it.url} => ${it.file}"
        } catch(e) {
            println "failed: ${it.url}, $e"
        }
    }
}

Scala の場合: 限定継続 + Actor

今回、Scala では限定継続と Actor を使って非同期処理を実装してみました。(限定継続の処理内容に関しては id:fits:20100207 参照)

Groovy 等と比べるとコード量は多くなりますが、継続オブジェクト(下記サンプルでは k の箇所)を Actor に渡す事で非同期処理を同期的に実装できるようになります。

下記サンプルでは、(1) 〜 (6) のような処理の流れが非同期的に実行されるようになっています。(foreach はすぐに完了し Actor 処理の実行/完了待ちになります)

非同期化している箇所は Groovy 版のサンプルとほぼ同じですが、ダウンロードが完了したものから処理結果を文字列出力するようになっています。

  • Scala 2.9.1(JavaSE 7 依存)

なお、Scala 2.9.1 で限定継続を使う場合は実行時に -P:continuations:enable オプション指定が必要になります。

実行例(限定継続を有効化)
scala -P:continuations:enable async_download_web.scala destdir < urls.txt
async_download_web.scala
import scala.actors.Actor
import scala.actors.Actor._
import scala.io.Source
import scala.util.continuations._

import java.io.{InputStream, File}
import java.net.URL
import java.nio.file.{Paths, Files, Path}
import java.nio.file.StandardCopyOption._

val using = (st: InputStream) => (block: InputStream => Unit) => try {block(st)} finally {st.close()}

//URL接続用パラメータ
case class URLOpen(val url: URL, val k: (InputStream => Unit))
//ダウンロード用パラメータ
case class URLDownload(val url: URL, val stream: InputStream, 
                val destDir: String, val k: (Path => Unit))

class URLActor extends Actor {
    def act() {
        loop {
            react {
                //URL接続処理 (2)
                case uo: URLOpen => {
                    try {
                        //継続(uo.k)を実行することで
                        //val stream に openStream の結果が設定され、
                        // (3) から処理が継続される
                        uo.k(uo.url.openStream())
                    }
                    catch {
                        //例外発生時は結果を出力して Actor を停止
                        case e: Exception => failStop(e, uo.url)
                    }
                }
                //ダウンロード処理 (5)
                case rs: URLDownload => {
                    val f = new File(rs.url.getFile()).getName()
                    val filePath = Paths.get(rs.destDir, f)

                    try {
                        using (rs.stream) {stream =>
                            Files.copy(stream, filePath, REPLACE_EXISTING)
                        }
                        //継続(rs.k)を実行することで
                        //val file に filePath が設定され、
                        // (6) から処理が継続される
                        rs.k(filePath)
                    }
                    catch {
                        //例外発生時は結果を出力して Actor を停止
                        case e: Exception => failStop(e, rs.url)
                    }
                }
            }
        }
    }

    def stop() {
        exit
    }

    def failStop(e: Exception, url: URL) {
        printf("failed: %s, %s\n", url, e)
        exit
    }
}

val dir = args(0)

Source.stdin.getLines.toList.foreach {u =>
    val url = new URL(u)
    reset {
        val actor = new URLActor()
        //Actor を開始
        actor.start

        //URL接続処理
        val stream = shift {k: (InputStream => Unit) =>
            //URL接続処理の非同期呼び出し (1)
            actor ! URLOpen(url, k)
        }
        // (3)

        //ダウンロード処理
        val file = shift {k: (Path => Unit) =>
            //ダウンロード処理の非同期呼び出し (4)
            actor ! URLDownload(url, stream, dir, k)
        }
        // (6)

        printf("downloaded: %s => %s\n", url, file)
        //Actor を停止
        actor.stop
    }
}

F# の場合: 非同期ワークフロー

F# では非同期ワークフローという機能が用意されているので、比較的容易に非同期処理を同期的に実装できます。

非同期ワークフローでは、async ブロック内に非同期処理を実装、非同期処理の呼び出し箇所で let! や do! のような ! を付けた命令を使用し、あとは async ブロックを Async.RunSynchronously メソッド等に渡すだけで非同期処理を実装できます。

下記サンプルでは、Groovy や Scala のサンプルとは異なり、Web コンテンツの読み込みとローカルファイルへの書き込み処理もそれぞれ非同期処理しています。

また、Async.Parallel で並列化、Async.RunSynchronously で処理が全て完了するまで終了しないようにしており、ignore はビルド時にワーニングが出るのを防ぐために使っています。(Async.RunSynchronously の戻り値を無視するため)

async_download_web.fs
open System
open System.IO
open System.Net

[<EntryPoint>]
let main(args: string[]) = 
    let downloadFile (dir: string) (url: string) = 
        async {
            try
                let req = WebRequest.Create(url)
                //URL接続(非同期)
                let! res = req.AsyncGetResponse()

                let fileName = Path.Combine(dir, Path.GetFileName(url))

                use stream = res.GetResponseStream()
                use fs = new FileStream(fileName, FileMode.Create)

                //Webコンテンツの読み込み(非同期)
                let! buf = stream.AsyncRead(int res.ContentLength)
                //ローカルファイルへの書き込み(非同期)
                do! fs.AsyncWrite(buf, 0, buf.Length)

                stdout.WriteLine("downloaded: {0} => {1}", url, fileName)
            with
            | _ as e -> stdout.WriteLine("failed: {0}, {1}", url, e.Message)
        }

    stdin.ReadToEnd().Split([| Environment.NewLine |], StringSplitOptions.RemoveEmptyEntries)
        |> Array.map (downloadFile args.[0])
        |> Async.Parallel
        |> Async.RunSynchronously
        |> ignore

    0

*1:厳密には withPool に渡すクロージャ内