読者です 読者をやめる 読者になる 読者になる

並列処理でWebコンテンツをダウンロードする方法 - Groovy, Scala, C#, Java, Ruby

複数のWebコンテンツ(HTMLや画像など)をダウンロードする際に 1件ずつ処理していたのでは非効率です。
というわけで、並列的にWebコンテンツをダウンロードするプログラムを Groovy, Scala, C#, Java, Ruby で実装してみました。

主な仕様は以下で、外部ライブラリを使用せずに実装しました。

  • 実行時の第1引数で出力先ディレクトリを指定
  • ダウンロード対象の URL を標準入力で指定(改行区切りで複数指定)
  • URL 内のファイル名を出力ファイル名として使用
実行例
groovy download_web.groovy destdir < urls.txt

サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20110925/

Groovy の場合

Groovy 1.8 では GPars が同梱されているので、GPars による並列コレクションを使えば簡単に実装できます。
GParsExecutorsPool.withPool に渡したクロージャ内のコレクションで並列処理用のメソッド(下記の eachParallel)が使えるようになります。

  • Groovy 1.8.2
download_web.groovy
import groovyx.gpars.GParsExecutorsPool

def dir = args[0]

GParsExecutorsPool.withPool {
//並列数を固定化するなら以下のようにする
//GParsExecutorsPool.withPool(5) {

    System.in.readLines() eachParallel {u ->
        def url = new URL(u)

        try {
            def file =  new File(dir, new File(url.file).name)

            url.withInputStream {input ->
                file.bytes = input.bytes
            }

            println "downloaded: $url => $file"
        }
        catch (e) {
            println "failed: $url, $e"
        }
    }
}

Scala の場合

Scala 2.9 では並列コレクションが使えます。
コレクションに対して par メソッドを呼び出すと並列コレクション化され、後は foreach 等を実行すれば並列に処理されます。

ただし、デフォルトでは JVM が使用できるプロセッサ数※までしか並列化されないようなので、今回のような用途では並列数が少ないかもしれません。

※ scala.collection.parallel.availableProcessors で数値を参照可
   実際には java.lang.Runtime.getRuntime().availableProcessors() の値が設定されている

なお、ファイル保存処理を簡単に実装するため、JavaSE 7 で導入された java.nio.file.Files クラス等を使用しています。

  • Scala 2.9.1(JavaSE 7 依存)
download_web_scala
import scala.io.Source

import java.io.File
import java.net.URL
import java.nio.file.{Paths, Files}
import java.nio.file.StandardCopyOption._

val dir = args(0)

val using = (st: InputStream) => (block: InputStream => Unit) => try {block(st)} finally {st.close()}

Source.stdin.getLines.toArray.par.foreach {u =>
    val url = new URL(u)
    val filePath = Paths.get(dir, new File(url.getFile()).getName())

    try {
        using (url.openStream()) {stream =>
            Files.copy(stream, filePath, REPLACE_EXISTING)
        }

        printf("downloaded: %s => %s\n", url, filePath)
    } catch {
        case e: Exception => printf("failed: %s, %s\n", url, e)
    }
}

C# の場合

.NET Framework 4 では並列タスクが使えます。
Parallel.ForEach にコレクションとその処理内容を渡せば並列化されます。

DownloadWeb.cs
using System;
using System.IO;
using System.Net;
using System.Threading.Tasks;

public class DownloadWeb
{
    public static void Main(string[] args)
    {
        var urls = Console.In.ReadToEnd().Split(new string[]{Environment.NewLine}, StringSplitOptions.RemoveEmptyEntries);

        var dir = args[0];

        Parallel.ForEach(urls, (u) => {
            var url = new Uri(u);

            try {
                var filePath = Path.Combine(dir, Path.GetFileName(url.LocalPath));
                new WebClient().DownloadFile(url, filePath);

                Console.WriteLine("downloaded: {0} => {1}", url, filePath);
            }
            catch (Exception e) {
                Console.WriteLine("failed: {0}, {1}", url, e);
            }
        });
    }
}

Java の場合

Java の場合、今のところ並列コレクション等の仕組みが用意されていないようなので Concurrency Utilities を使って実装しました。
ファイルの保存処理には JavaSE 7 で導入された java.nio.file.Files クラス等を使用しています。

下記では URL クラスの代わりに URI を使っていますが、特に深い理由は無く Scala のサンプルと同様に URL クラスを使っても問題ありません。


なお、Paths.get() の引数に URI を渡せますが、現バージョンでは "http://・・・" から作成した URI を渡す事はできませんでした。(java.nio.file.FileSystemNotFoundException: Provider "http" not installed となる)

  • JavaSE 7
DownloadWeb.java
import java.io.*;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;

public class DownloadWeb {
    public static void main(String[] args) throws Exception {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

        ExecutorService exec = Executors.newCachedThreadPool();
        //並列数を固定化するなら以下のようにする
        //ExecutorService exec = Executors.newFixedThreadPool(5);

        final String dir = args[0];
        String url = null;

        while ((url = reader.readLine()) != null) {
            final URI uri = URI.create(url);
            final Path filePath = Paths.get(dir, new File(uri.getPath()).getName());

            exec.submit(new Runnable() {
                @Override
                public void run() {
                    try (InputStream in = uri.toURL().openStream()) {
                        Files.copy(in, filePath, StandardCopyOption.REPLACE_EXISTING);
                        System.out.printf("downloaded: %s => %s\n", uri, filePath);
                    } catch (Exception e) {
                        System.out.printf("failed: %s, %s\n", uri, e);
                    }
                }
            });
        }
        //ダウンロード終了まで待機
        exec.shutdown();
    }
}

Ruby の場合

Ruby の場合も今のところ並列コレクション等の仕組みが用意されていないみたいなので Queue と Thread を使って実装してみました。

スレッド数を固定化しているので、Java で Executors.newFixedThreadPool(数値) を使ったケースや Groovy で GParsExecutorsPool.withPool(数値) {・・・} を使ったケースと同様の処理になると思います。

download_web.rb
require "thread"
require "uri"
require "net/http"

#並列数(スレッド数)
poolSize = 5

dir = ARGV[0]

q = Queue.new
#キューに URL を設定
$stdin.readlines.each {|l| q.push(l.chomp)}

threads = []
poolSize.times do
    threads << Thread.start(q) do |tq|
        #キューが空になるまでループ
        while not q.empty?
            #キューから URL 取り出し
            u = q.pop(true)

            begin
                url = URI.parse(u)
                filePath = File.join(dir, File.basename(url.path))

                res = Net::HTTP.get_response(url)
                open(filePath, 'wb') {|f| f.puts res.body}

                puts "downloaded: #{url} => #{filePath}"
            rescue => e
                puts "failed: #{url}, #{e}"
            end
        end
    end
end

#ダウンロード終了まで待機
threads.each {|t| t.join}