关于ruby:ThreadPool中的死锁

关于ruby:ThreadPool中的死锁

Deadlock in ThreadPool

我找不到适合Ruby的不错的ThreadPool实现,所以我写了我的(部分基于此处的代码:http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/ posts / show / 3276,但更改为等待/信号和其他实现ThreadPool关闭的实现,但是经过一段时间的运行(拥有100个线程并处理了大约1300个任务),它死于第25行的死锁-等待新工作 有什么想法,为什么会发生?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
require 'thread'
begin
  require 'fastthread'
rescue LoadError
  $stderr.puts"Using the ruby-core thread implementation"
end

class ThreadPool
  class Worker
    def initialize(callback)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @callback = callback
      @mutex.synchronize {@running = true}
      @thread = Thread.new do
        while @mutex.synchronize {@running}
          block = get_block
          if block
            block.call
            reset_block
            # Signal the ThreadPool that this worker is ready for another job
            @callback.signal
          else
            # Wait for a new job
            @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @mutex.synchronize {@block}
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError,"Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @mutex.synchronize {@block = nil}
    end

    def busy?
      @mutex.synchronize {!@block.nil?}
    end

    def stop
      @mutex.synchronize {@running = false}
      # Signal the thread not to wait for a new job
      @cv.signal
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def size
    @mutex.synchronize {@workers.size}
  end

  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end

  def shutdown
    @mutex.synchronize {@workers.each {|w| w.stop}}
  end
  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    while true
      @mutex.synchronize do
         worker = get_worker
         if worker
           return worker.set_block(block)
         else
           # Wait for a free worker
           @cv.wait(@mutex)
         end
      end
    end
  end

  # Used by workers to report ready status
  def signal
    @cv.signal
  end

  private
  def get_worker
    free_worker || create_worker
  end

  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end

  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new(self)
    @workers << worker
    worker
  end
end

好的,因此实现的主要问题是:如何确保没有信号丢失并避免死锁?

以我的经验,使用条件变量和互斥锁确实很难做到这一点,但使用信号量则很容易。碰巧红宝石实现了一个名为Queue(或SizedQueue)的对象,该对象应该可以解决问题。这是我建议的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
require 'thread'
begin
  require 'fasttread'
rescue LoadError
  $stderr.puts"Using the ruby-core thread implementation"
end

class ThreadPool
  class Worker
    def initialize(thread_queue)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @queue = thread_queue
      @running = true
      @thread = Thread.new do
        @mutex.synchronize do
          while @running
            @cv.wait(@mutex)
            block = get_block
            if block
              @mutex.unlock
              block.call
              @mutex.lock
              reset_block
            end
            @queue << self
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @block
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError,"Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @block = nil
    end

    def busy?
      @mutex.synchronize { !@block.nil? }
    end

    def stop
      @mutex.synchronize do
        @running = false
        @cv.signal
      end
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @queue = Queue.new
    @workers = []
  end

  def size
    @workers.size
  end

  def busy?
    @queue.size < @workers.size
  end

  def shutdown
    @workers.each { |w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    worker = get_worker
    worker.set_block(block)
  end

  private

  def get_worker
    if !@queue.empty? or @workers.size == @max_size
      return @queue.pop
    else
      worker = Worker.new(@queue)
      @workers << worker
      worker
    end
  end

end

这是一个简单的测试代码:

1
2
3
4
tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print"Computation #{i} done. Nb of tasks: #{tp.size}\
"
} }
tp.shutdown


您可以尝试work_queue gem,它用于协调生产者和工作线程池之间的工作。


多年来,顶级评论者的代码提供了很多帮助。在这里,它针对ruby 2.x进行了更新,并通过线程识别进行了改进。那有什么改善?当每个线程都有一个ID时,可以用存储任意信息的数组组成ThreadPool。一些想法:

  • 无数组:典型ThreadPool用法。即使使用GIL,它也可以使死代码易于编码,并且对于高延迟应用程序(例如,大量Web爬网,
  • ThreadPool和Array的大小取决于CPU的数量:易于派生进程以使用所有CPU,
  • ThreadPool和Array的大小取决于资源的数量:例如,每个数组元素代表一个实例池中的一个处理器,因此,如果您有10个实例,每个实例具有4个CPU,则TP可以管理40个子进程的工作。

对于这最后两个,不要考虑线程在做工作,而要考虑ThreadPool管理正在工作的子流程。管理任务是轻量级的,并且与关心GIL的子流程结合在一起时。

使用此类,您可以用大约一百行代码来编写基于集群的MapReduce!这段代码很简短,虽然有点让人费解。希望能帮助到你。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# Usage:
#
#   Thread.abort_on_exception = true # help localize errors while debugging
#   pool = ThreadPool.new(thread_pool_size)
#   50.times {|i|
#     pool.process { ... }
#     or
#     pool.process {|id| ... } # worker identifies itself as id
#   }
#   pool.shutdown()

class ThreadPool

  require 'thread'

  class ThreadPoolWorker

    attr_accessor :id

    def initialize(thread_queue, id)
      @id = id # worker id is exposed thru tp.process {|id| ... }
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @idle_queue = thread_queue
      @running = true
      @block = nil
      @thread = Thread.new {
        @mutex.synchronize {
          while @running
            @cv.wait(@mutex) # block until there is work to do
            if @block
              @mutex.unlock
              begin
                @block.call(@id)
              ensure
                @mutex.lock
              end
              @block = nil
            end
            @idle_queue << self
          end
        }
      }
    end

    def set_block(block)
      @mutex.synchronize {
        raise RuntimeError,"Thread is busy." if @block
        @block = block
        @cv.signal # notify thread in this class, there is work to be done
      }
    end

    def busy?
      @mutex.synchronize { ! @block.nil? }
    end

    def stop
      @mutex.synchronize {
        @running = false
        @cv.signal
      }
      @thread.join
    end

    def name
      @thread.inspect
    end
  end


  attr_accessor :max_size, :queue

  def initialize(max_size = 10)
    @process_mutex = Mutex.new
    @max_size = max_size
    @queue = Queue.new # of idle workers
    @workers = []      # array to hold workers

    # construct workers
    @max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }

    # queue up workers (workers in queue are idle and available to
    # work).  queue blocks if no workers are available.
    @max_size.times {|i| @queue << @workers[i] }

    sleep 1 # important to give threads a chance to initialize
  end

  def size
    @workers.size
  end

  def idle
    @queue.size
  end

  # are any threads idle

  def busy?
    # @queue.size < @workers.size
    @queue.size == 0 && @workers.size == @max_size
  end

  # block until all threads finish

  def shutdown
    @workers.each {|w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block = nil, &blk)
    @process_mutex.synchronize {
      block = blk if block_given?
      worker = @queue.pop # assign to next worker; block until one is ready
      worker.set_block(block) # give code block to worker and tell it to start
    }
  end


end


好的,问题似乎出在您的ThreadPool#signal方法中。可能发生的是:

1-您所有的工人都很忙,您尝试处理新工作

2-第90行得到一名零工人

3-工人被释放并发出信号,但由于ThreadPool不等待信号而丢失了信号

4-您落在第95行,即使有空闲工人,也要等待。

这里的错误是即使没有人在听,您也可以向自由工作者发出信号。此ThreadPool#signal方法应为:

1
2
3
def signal
     @mutex.synchronize { @cv.signal }
end

这个问题在Worker对象中也是一样。可能发生的情况是:

1-工人刚刚完成工作

2-检查(第17行)是否有等待的工作:没有

3-线程池发送一个新作业并发出信号...但是信号丢失

4-即使信号被标记为忙,工作人员仍在等待信号

您应该将您的initialize方法放置为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def initialize(callback)
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @callback = callback
  @mutex.synchronize {@running = true}
  @thread = Thread.new do
    @mutex.synchronize do
      while @running
        block = get_block
        if block
          @mutex.unlock
          block.call
          @mutex.lock
          reset_block
          # Signal the ThreadPool that this worker is ready for another job
          @callback.signal
        else
          # Wait for a new job
          @cv.wait(@mutex)
        end
      end
    end
  end
end

接下来,Worker#get_block和Worker#reset_block方法不应再同步。这样,您就无法在块测试和信号等待之间将块分配给工作人员。


我在这里有些偏见,但是我建议您使用某种流程语言对此模型进行建模并对其进行模型检查。免费提供的工具包括,例如mCRL2工具集(使用基于ACP的语言),移动工作台(pi演算)和Spin(PROMELA)。

否则,我建议删除对问题不重要的所有代码,并找出发生死锁的最小情况。我怀疑100个线程和1300个任务对于陷入僵局是否必不可少。在较小的情况下,您可能只需添加一些调试打印即可提供足够的信息来解决问题。


推荐阅读

    linux命令基础代码?

    linux命令基础代码?,基础,系统,管理,工作,代码,网络,单位,信息,数据,命令,lin

    linux打开代码命令行?

    linux打开代码命令行?,系统,首页,工具,终端,代码,密码,情况,命令,快捷键,窗

    linux克隆代码命令?

    linux克隆代码命令?,系统,代码,文件,命令,目录,源文件,文件夹,路径,目标,表

    linux命令行调试代码?

    linux命令行调试代码?,环境,代码,信息,平台,程序,编辑,版本,步骤,体系结构,

    linux编译源代码命令?

    linux编译源代码命令?,工具,代码,百度,最新,环境,项目,系统,电脑,密码,内核,l

    linux命令提交代码?

    linux命令提交代码?,工作,系统,地址,代码,命令,数据,信息,目录,标准,发行,求

    linux代码同步命令?

    linux代码同步命令?,时间,服务,系统,地址,代码,网络,通信,图片,风险,管理,lin

    linux命令错误代码?

    linux命令错误代码?,系统,密码,电脑,网络,手机,网址,软件,代码,设备,老板,Lin

    linux同步代码命令?

    linux同步代码命令?,时间,系统,通信,网络,标准,图片,服务,代码,线程,单位,Lin

    linux拉取代码命令?

    linux拉取代码命令?,代码,工作,地址,命令,数据,系统,单位,生产,软件,目录,lin

    linux代码对齐命令?

    linux代码对齐命令?,系统,地址,标准,信息,对比,名称,代码,命令,文件,工作,lin

    linux命令运行代码?

    linux命令运行代码?,代码,单位,系统,环境,连续,保险,工具,命令,文件,音乐,Lin

    搭建linux命令行代码?

    搭建linux命令行代码?,系统,软件,工作,名字,服务,代码,地址,环境,管理,密码,l

    linux查看命令代码?

    linux查看命令代码?,系统,信息,代码,名称,命令,设备,数字,第一,软件,管理,在L

    linux删除代码命令行?

    linux删除代码命令行?,系统,代码,命令,文件,不了,环境,档案,名称,目录,文件

    linux命令行代码实现?

    linux命令行代码实现?,标准,代码,管理,网络,地址,工作,命令,网上,环境,名称,

    linux桌面命令代码?

    linux桌面命令代码?,电脑,系统,密码,环境,代码,基础,地址,服务,网上,通讯,lin

    c代码执行linux命令?

    c代码执行linux命令?,系统,工作,标准,情况,代码,环境,设备,命令,函数,指令,li

    linux进入代码行命令?

    linux进入代码行命令?,系统,代码,设备,终端,环境,信息,第一,命令,窗口,模式,

    linux命令行看代码?

    linux命令行看代码?,代码,基础,系统,命令,数字,工作,情况,进程,程序,终端,在L