concurrent-ruby
Advanced tools
| module Concurrent | ||
| module Collection | ||
| # @!visibility private | ||
| # @!macro ruby_timeout_queue | ||
| class RubyTimeoutQueue < ::Queue | ||
| def initialize(*args) | ||
| if RUBY_VERSION >= '3.2' | ||
| raise "#{self.class.name} is not needed on Ruby 3.2 or later, use ::Queue instead" | ||
| end | ||
| super(*args) | ||
| @mutex = Mutex.new | ||
| @cond_var = ConditionVariable.new | ||
| end | ||
| def push(obj) | ||
| @mutex.synchronize do | ||
| super(obj) | ||
| @cond_var.signal | ||
| end | ||
| end | ||
| alias_method :enq, :push | ||
| alias_method :<<, :push | ||
| def pop(non_block = false, timeout: nil) | ||
| if non_block && timeout | ||
| raise ArgumentError, "can't set a timeout if non_block is enabled" | ||
| end | ||
| if non_block | ||
| super(true) | ||
| elsif timeout | ||
| @mutex.synchronize do | ||
| deadline = Concurrent.monotonic_time + timeout | ||
| while (now = Concurrent.monotonic_time) < deadline && empty? | ||
| @cond_var.wait(@mutex, deadline - now) | ||
| end | ||
| begin | ||
| return super(true) | ||
| rescue ThreadError | ||
| # still empty | ||
| nil | ||
| end | ||
| end | ||
| else | ||
| super(false) | ||
| end | ||
| end | ||
| alias_method :deq, :pop | ||
| alias_method :shift, :pop | ||
| end | ||
| private_constant :RubyTimeoutQueue | ||
| end | ||
| end |
| module Concurrent | ||
| module Collection | ||
| # @!visibility private | ||
| # @!macro internal_implementation_note | ||
| TimeoutQueueImplementation = if RUBY_VERSION >= '3.2' | ||
| ::Queue | ||
| else | ||
| require 'concurrent/collection/ruby_timeout_queue' | ||
| RubyTimeoutQueue | ||
| end | ||
| private_constant :TimeoutQueueImplementation | ||
| # @!visibility private | ||
| # @!macro timeout_queue | ||
| class TimeoutQueue < TimeoutQueueImplementation | ||
| end | ||
| end | ||
| end |
+6
-0
| ## Current | ||
| ## Release v1.3.6 (13 December 2025) | ||
| concurrent-ruby: | ||
| * See the [release notes on GitHub](https://github.com/ruby-concurrency/concurrent-ruby/releases/tag/v1.3.6). | ||
| ## Release v1.3.5, edge v0.7.2 (15 January 2025) | ||
@@ -4,0 +10,0 @@ |
@@ -9,2 +9,3 @@ require 'concurrent/utility/engine' | ||
| return false if Concurrent.on_jruby? || Concurrent.on_truffleruby? | ||
| return RUBY_VERSION < "3.0" if Concurrent.on_cruby? | ||
@@ -11,0 +12,0 @@ mutex = Mutex.new |
@@ -84,6 +84,4 @@ require 'concurrent/utility/engine' | ||
| # | ||
| # This is a no-op on some pool implementation (e.g. the Java one). The Ruby | ||
| # pool will auto-prune each time a new job is posted. You will need to call | ||
| # this method explicitly in case your application post jobs in bursts (a | ||
| # lot of jobs and then nothing for long periods) | ||
| # This is a no-op on all pool implementations as they prune themselves | ||
| # automatically, and has been deprecated. | ||
@@ -90,0 +88,0 @@ # @!macro thread_pool_executor_public_api |
@@ -49,2 +49,3 @@ require 'concurrent/utility/engine' | ||
| @executor.shutdownNow | ||
| wait_for_termination | ||
| nil | ||
@@ -51,0 +52,0 @@ end |
@@ -11,2 +11,3 @@ if Concurrent.on_jruby? | ||
| class JavaThreadPoolExecutor < JavaExecutorService | ||
| include Concern::Deprecation | ||
@@ -104,2 +105,3 @@ # @!macro thread_pool_executor_constant_default_max_pool_size | ||
| def prune_pool | ||
| deprecated "#prune_pool has no effect and will be removed in the next release." | ||
| end | ||
@@ -106,0 +108,0 @@ |
| require 'concurrent/executor/ruby_thread_pool_executor' | ||
| require 'concurrent/executor/serial_executor_service' | ||
@@ -9,2 +10,3 @@ module Concurrent | ||
| class RubySingleThreadExecutor < RubyThreadPoolExecutor | ||
| include SerialExecutorService | ||
@@ -11,0 +13,0 @@ # @!macro single_thread_executor_method_initialize |
@@ -6,2 +6,3 @@ require 'thread' | ||
| require 'concurrent/utility/monotonic_time' | ||
| require 'concurrent/collection/timeout_queue' | ||
@@ -14,2 +15,3 @@ module Concurrent | ||
| class RubyThreadPoolExecutor < RubyExecutorService | ||
| include Concern::Deprecation | ||
@@ -99,8 +101,27 @@ # @!macro thread_pool_executor_constant_default_max_pool_size | ||
| # removes the worker if it can be pruned | ||
| # | ||
| # @return [true, false] if the worker was pruned | ||
| # | ||
| # @!visibility private | ||
| def remove_busy_worker(worker) | ||
| synchronize { ns_remove_busy_worker worker } | ||
| def prune_worker(worker) | ||
| synchronize do | ||
| if ns_prunable_capacity > 0 | ||
| remove_worker worker | ||
| true | ||
| else | ||
| false | ||
| end | ||
| end | ||
| end | ||
| # @!visibility private | ||
| def remove_worker(worker) | ||
| synchronize do | ||
| ns_remove_ready_worker worker | ||
| ns_remove_busy_worker worker | ||
| end | ||
| end | ||
| # @!visibility private | ||
| def ready_worker(worker, last_message) | ||
@@ -122,3 +143,3 @@ synchronize { ns_ready_worker worker, last_message } | ||
| def prune_pool | ||
| synchronize { ns_prune_pool } | ||
| deprecated "#prune_pool has no effect and will be removed in next the release, see https://github.com/ruby-concurrency/concurrent-ruby/pull/1082." | ||
| end | ||
@@ -153,5 +174,2 @@ | ||
| @ruby_pid = $$ # detects if Ruby has forked | ||
| @gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented | ||
| @next_gc_time = Concurrent.monotonic_time + @gc_interval | ||
| end | ||
@@ -170,8 +188,6 @@ | ||
| @scheduled_task_count += 1 | ||
| nil | ||
| else | ||
| return fallback_action(*args, &task) | ||
| fallback_action(*args, &task) | ||
| end | ||
| ns_prune_pool if @next_gc_time < Concurrent.monotonic_time | ||
| nil | ||
| end | ||
@@ -227,3 +243,3 @@ | ||
| return false if @synchronous | ||
| if !ns_limited_queue? || @queue.size < @max_queue | ||
@@ -275,3 +291,3 @@ @queue << [task, args] | ||
| # removes a worker which is not in not tracked in @ready | ||
| # removes a worker which is not tracked in @ready | ||
| # | ||
@@ -285,21 +301,23 @@ # @!visibility private | ||
| # try oldest worker if it is idle for enough time, it's returned back at the start | ||
| # @!visibility private | ||
| def ns_remove_ready_worker(worker) | ||
| if index = @ready.index { |rw, _| rw == worker } | ||
| @ready.delete_at(index) | ||
| end | ||
| true | ||
| end | ||
| # @return [Integer] number of excess idle workers which can be removed without | ||
| # going below min_length, or all workers if not running | ||
| # | ||
| # @!visibility private | ||
| def ns_prune_pool | ||
| now = Concurrent.monotonic_time | ||
| stopped_workers = 0 | ||
| while !@ready.empty? && (@pool.size - stopped_workers > @min_length) | ||
| worker, last_message = @ready.first | ||
| if now - last_message > self.idletime | ||
| stopped_workers += 1 | ||
| @ready.shift | ||
| worker << :stop | ||
| else break | ||
| end | ||
| def ns_prunable_capacity | ||
| if running? | ||
| [@pool.size - @min_length, @ready.size].min | ||
| else | ||
| @pool.size | ||
| end | ||
| @next_gc_time = Concurrent.monotonic_time + @gc_interval | ||
| end | ||
| # @!visibility private | ||
| def ns_reset_if_forked | ||
@@ -324,3 +342,3 @@ if $$ != @ruby_pid | ||
| # instance variables accessed only under pool's lock so no need to sync here again | ||
| @queue = Queue.new | ||
| @queue = Collection::TimeoutQueue.new | ||
| @pool = pool | ||
@@ -351,9 +369,13 @@ @thread = create_worker @queue, pool, pool.idletime | ||
| catch(:stop) do | ||
| prunable = true | ||
| loop do | ||
| case message = my_queue.pop | ||
| timeout = prunable && my_pool.running? ? my_idletime : nil | ||
| case message = my_queue.pop(timeout: timeout) | ||
| when nil | ||
| throw :stop if my_pool.prune_worker(self) | ||
| prunable = false | ||
| when :stop | ||
| my_pool.remove_busy_worker(self) | ||
| my_pool.remove_worker(self) | ||
| throw :stop | ||
| else | ||
@@ -363,2 +385,3 @@ task, args = message | ||
| my_pool.ready_worker(self, Concurrent.monotonic_time) | ||
| prunable = true | ||
| end | ||
@@ -365,0 +388,0 @@ end |
@@ -64,2 +64,3 @@ require 'concurrent/scheduled_task' | ||
| shutdown | ||
| @timer_executor.kill | ||
| end | ||
@@ -126,3 +127,5 @@ | ||
| @queue.clear | ||
| @timer_executor.kill | ||
| @condition.set | ||
| @condition.reset | ||
| @timer_executor.shutdown | ||
| stopped_event.set | ||
@@ -129,0 +132,0 @@ end |
@@ -13,3 +13,2 @@ require 'concurrent/executor/abstract_executor_service' | ||
| require 'concurrent/executor/ruby_thread_pool_executor' | ||
| require 'concurrent/executor/cached_thread_pool' | ||
| require 'concurrent/executor/safe_task_executor' | ||
@@ -16,0 +15,0 @@ require 'concurrent/executor/serial_executor_service' |
@@ -12,3 +12,3 @@ require 'concurrent/concern/dereferenceable' | ||
| # On top of the fundamental `#put` and `#take` operations, we also provide a | ||
| # `#mutate` that is atomic with respect to operations on the same instance. | ||
| # `#modify` that is atomic with respect to operations on the same instance. | ||
| # These operations all support timeouts. | ||
@@ -91,3 +91,3 @@ # | ||
| # if we timeoud out we'll still be empty | ||
| # If we timed out we'll still be empty | ||
| if unlocked_full? | ||
@@ -121,6 +121,6 @@ yield @value | ||
| # Atomically `take`, yield the value to a block for transformation, and then | ||
| # `put` the transformed value. Returns the transformed value. A timeout can | ||
| # `put` the transformed value. Returns the pre-transform value. A timeout can | ||
| # be set to limit the time spent blocked, in which case it returns `TIMEOUT` | ||
| # if the time is exceeded. | ||
| # @return [Object] the transformed value, or `TIMEOUT` | ||
| # @return [Object] the pre-transform value, or `TIMEOUT` | ||
| def modify(timeout = nil) | ||
@@ -127,0 +127,0 @@ raise ArgumentError.new('no block given') unless block_given? |
@@ -170,3 +170,3 @@ require 'thread' | ||
| # c1.wait.state #=> :fulfilled | ||
| # c1.value #=> 45 | ||
| # c1.value #=> 42 | ||
| # c2.wait.state #=> :rejected | ||
@@ -173,0 +173,0 @@ # c2.reason #=> #<RuntimeError: Boom!> |
@@ -5,2 +5,3 @@ require 'concurrent/collection/copy_on_notify_observer_set' | ||
| require 'concurrent/atomic/atomic_boolean' | ||
| require 'concurrent/atomic/atomic_fixnum' | ||
| require 'concurrent/executor/executor_service' | ||
@@ -240,2 +241,3 @@ require 'concurrent/executor/ruby_executor_service' | ||
| @running.make_true | ||
| @age.increment | ||
| schedule_next_task(@run_now ? 0 : @execution_interval) | ||
@@ -314,2 +316,3 @@ end | ||
| @running = Concurrent::AtomicBoolean.new(false) | ||
| @age = Concurrent::AtomicFixnum.new(0) | ||
| @value = nil | ||
@@ -334,3 +337,3 @@ | ||
| def schedule_next_task(interval = execution_interval) | ||
| ScheduledTask.execute(interval, executor: @executor, args: [Concurrent::Event.new], &method(:execute_task)) | ||
| ScheduledTask.execute(interval, executor: @executor, args: [Concurrent::Event.new, @age.value], &method(:execute_task)) | ||
| nil | ||
@@ -340,4 +343,6 @@ end | ||
| # @!visibility private | ||
| def execute_task(completion) | ||
| def execute_task(completion, age_when_scheduled) | ||
| return nil unless @running.true? | ||
| return nil unless @age.value == age_when_scheduled | ||
| start_time = Concurrent.monotonic_time | ||
@@ -344,0 +349,0 @@ _success, value, reason = @task.execute(self) |
| module Concurrent | ||
| VERSION = '1.3.5' | ||
| VERSION = '1.3.6' | ||
| end |
+4
-2
@@ -210,3 +210,3 @@ # Concurrent Ruby | ||
| These features are under active development and may change frequently. They are expected not to | ||
| keep backward compatibility (there may also lack tests and documentation). Semantic versions will | ||
| keep backward compatibility (they may also lack tests and documentation). Semantic versions will | ||
| be obeyed though. Features developed in `concurrent-ruby-edge` are expected to move to | ||
@@ -362,3 +362,4 @@ `concurrent-ruby` when final. | ||
| * Set env variable `CONCURRENT_JRUBY_HOME` to point to it, e.g. `/usr/local/opt/rbenv/versions/jruby-9.2.17.0` | ||
| * Install Docker, required for Windows builds | ||
| * Install Docker or Podman, required for Windows builds | ||
| * If `bundle config get path` is set, use `bundle config set --local path.system true` otherwise the `gem name, path: '.'` gems won't be found (Bundler limitation). | ||
@@ -383,2 +384,3 @@ ### Publishing the Gem | ||
| * [Samuel Williams](https://github.com/ioquatix) | ||
| * [Joshua Young](https://github.com/joshuay03) | ||
@@ -385,0 +387,0 @@ ### Special Thanks to |
Sorry, the diff of this file is not supported yet