イテレータのタイムアウト

たとえばこういう Ruby のコードがあって、

Net::HTTP.start(host, port) { |http|
  http.request(request) { |response|
    response.read_body { |chunk|
      func(chunk)  
    }
  }
}

read_body {|chunk| ... } というイテレータで、「エンティティボディを少しづつ取得して順次ブロックに与え」る、 つまり、ストリーミングで一行ずつ、データを chunk に入れてくれるわけなのだが、 twitter の streaming api が何らかの理由で止まったりして、再接続したいときがある。 その場合に Ruby の timeout というライブラリを使って、

Net::HTTP.start(host, port) { |http|
  timeout(TIMEOUT_SECS){ http.request(request) { |response|
    response.read_body { |chunk|
      func(chunk)  
    }
  }}
}

とやっても駄目で、

Net::HTTP.start(host, port) { |http|
  http.request(request) { |response|
    timeout(TIMEOUT_SECS) { response.read_body { |chunk|
      func(chunk)  
    }}
  }
}

とやっても駄目で、

Net::HTTP.start(host, port) { |http|
  http.request(request) { |response|
    response.read_body { |chunk|
      timeout(TIMEOUT_SECS){ func(chunk) }  
    }
  }
}

とやっても駄目だ。 read_body は一種の永久ループなので、これが timeout することは決してない。 いや、強制的に timeout させても意味がない、というべきか。 ブロック内の実行がある一定時間以上かかっていたらそのブロックを抜けるようなコードを書いてあげなくてはならないのだが、 スレッドを使って

  http.request(request) { |response|
    response.read_body { |chunk|
      $func_result = false
      t = Thread.new {
         sleep(TIMEOUT_SECS)
         if($func_result != true)
            Syslog.log(Syslog::LOG_WARNING, "Timeout")
            return
         end
      }
      $func_result = func(chunk)
      t.kill
    }
  }

こうすると、スレッドの中で sleep した後に、func がまだ true を返していなければ、 スレッドを抜ける前に syslog に警告を残してブロックを抜けてくれる。 funcがスレッドより先に終了すれば、t.kill でスレッドの後始末をする。 のではないかと思ったわけだ。 うまく動いてくれるかどうがわからんが。

ていうか、 a = func() という代入式があったとして、a の値が変更されるのは、 func() から値が返った後だよな。 返るまでは a には元の値が入ってるよな。 マルチスレッドだとそういうタイミングが問題になってくる。

スレッドって面白いけど難しいよな。どうやって動作確認すりゃいいのか、よくわからん。

ていうか、chunk に値が渡されるまでブロックの中身は実行されなさそうだよな。 だとするとこういうこと書いても無意味かもしれん。 じゃあどうすりゃいいんだという。 なんか RubyC言語のレベルでいじる(ライブラリを書く、read_bodyの実装をいじる)とかしないと根本的解決にはならんような気がしてきた。