The core of a rufus-scheduler. See implementations like Rufus::Scheduler::PlainScheduler and Rufus::Scheduler::EmScheduler for directly usable stuff.
classical options hash
Instantiates a Rufus::Scheduler.
# File lib/rufus/sc/scheduler.rb, line 100 def initialize(opts={}) @options = opts @jobs = get_queue(:at, opts) @cron_jobs = get_queue(:cron, opts) @frequency = @options[:frequency] || 0.330 @mutexes = {} end
Instantiates and starts a new Rufus::Scheduler.
# File lib/rufus/sc/scheduler.rb, line 114 def self.start_new(opts={}) s = self.new(opts) s.start s end
Returns a map job_id => job of all the jobs currently in the scheduler
# File lib/rufus/sc/scheduler.rb, line 279 def all_jobs jobs.merge(cron_jobs) end
Schedules a job at a given point in time.
scheduler.at 'Thu Mar 26 19:30:00 2009' do puts 'order pizza' end
pizza is for Thursday at 2000 (if the shop brochure is right).
# File lib/rufus/sc/scheduler.rb, line 147 def at(t, s=nil, opts={}, &block) add_job(AtJob.new(self, t, combine_opts(s, opts), &block)) end
Schedules a job given a cron string.
scheduler.cron '0 22 * * 1-5' do # every day of the week at 00:22 puts 'activate security system' end
# File lib/rufus/sc/scheduler.rb, line 174 def cron(cronstring, s=nil, opts={}, &block) add_cron_job(CronJob.new(self, cronstring, combine_opts(s, opts), &block)) end
Returns a map job_id => job for cron jobs
# File lib/rufus/sc/scheduler.rb, line 272 def cron_jobs @cron_jobs.to_h end
Determines if there is log_exception, handle_exception or on_exception method. If yes, hands the exception to it, else defaults to outputting details to $stderr.
# File lib/rufus/sc/scheduler.rb, line 225 def do_handle_exception(job, exception) begin [ :log_exception, :handle_exception, :on_exception ].each do |m| next unless self.respond_to?(m) if method(m).arity == 1 self.send(m, exception) else self.send(m, job, exception) end return # exception was handled successfully end rescue Exception => e $stderr.puts '*' * 80 $stderr.puts 'the exception handling method itself had an issue:' $stderr.puts e $stderr.puts *e.backtrace $stderr.puts '*' * 80 end $stderr.puts '=' * 80 $stderr.puts 'scheduler caught exception:' $stderr.puts exception $stderr.puts *exception.backtrace $stderr.puts '=' * 80 end
Schedules a recurring job every t.
scheduler.every '5m1w' do puts 'check blood pressure' end
checking blood pressure every 5 months and 1 week.
# File lib/rufus/sc/scheduler.rb, line 161 def every(t, s=nil, opts={}, &block) add_job(EveryJob.new(self, t, combine_opts(s, opts), &block)) end
Mostly used to find a job given its id. If the argument is a job, will simply return it.
If the argument is an id, and no job with that id is found, it will raise an ArgumentError.
# File lib/rufus/sc/scheduler.rb, line 297 def find(job_or_id) return job_or_id if job_or_id.respond_to?(:job_id) job = all_jobs[job_or_id] raise ArgumentError.new( "couldn't find job #{job_or_id.inspect}" ) unless job job end
Returns a list of jobs with the given tag
# File lib/rufus/sc/scheduler.rb, line 286 def find_by_tag(tag) all_jobs.values.select { |j| j.tags.include?(tag) } end
Schedules a job in a given amount of time.
scheduler.in '20m' do puts "order ristretto" end
will order an espresso (well sort of) in 20 minutes.
# File lib/rufus/sc/scheduler.rb, line 133 def in(t, s=nil, opts={}, &block) add_job(InJob.new(self, t, combine_opts(s, opts), &block)) end
Returns a map job_id => job for at/in/every jobs
# File lib/rufus/sc/scheduler.rb, line 265 def jobs @jobs.to_h end
Pauses a given job. If the argument is an id (String) and the corresponding job cannot be found, an ArgumentError will get raised.
# File lib/rufus/sc/scheduler.rb, line 204 def pause(job_or_id) find(job_or_id).pause end
Resumes a given job. If the argument is an id (String) and the corresponding job cannot be found, an ArgumentError will get raised.
# File lib/rufus/sc/scheduler.rb, line 212 def resume(job_or_id) find(job_or_id).resume end
Returns the list of the currently running jobs (jobs that just got triggered and are executing).
# File lib/rufus/sc/scheduler.rb, line 323 def running_jobs Thread.list.collect { |t| t["rufus_scheduler__trigger_thread__#{self.object_id}"] }.compact end
Returns the current list of trigger threads (threads) dedicated to the execution of jobs.
# File lib/rufus/sc/scheduler.rb, line 313 def trigger_threads Thread.list.select { |t| t["rufus_scheduler__trigger_thread__#{self.object_id}"] } end
Unschedules a job (cron or at/every/in job).
Returns the job that got unscheduled.
# File lib/rufus/sc/scheduler.rb, line 184 def unschedule(job_or_id) job_id = job_or_id.respond_to?(:job_id) ? job_or_id.job_id : job_or_id @jobs.unschedule(job_id) || @cron_jobs.unschedule(job_id) end
Given a tag, unschedules all the jobs that bear that tag.
# File lib/rufus/sc/scheduler.rb, line 193 def unschedule_by_tag(tag) jobs = find_by_tag(tag) jobs.each { |job| unschedule(job.job_id) } jobs end
# File lib/rufus/sc/scheduler.rb, line 383 def add_cron_job(job) complain_if_blocking_and_timeout(job) @cron_jobs << job job end
# File lib/rufus/sc/scheduler.rb, line 372 def add_job(job) complain_if_blocking_and_timeout(job) return nil if job.params[:discard_past] && Time.now.to_f >= job.at @jobs << job job end
# File lib/rufus/sc/scheduler.rb, line 349 def combine_opts(schedulable, opts) if schedulable.respond_to?(:trigger) || schedulable.respond_to?(:call) opts[:schedulable] = schedulable elsif schedulable != nil opts = schedulable.merge(opts) end opts end
Raises an error if the job has the params :blocking and :timeout set
# File lib/rufus/sc/scheduler.rb, line 394 def complain_if_blocking_and_timeout(job) raise( ArgumentError.new('cannot set a :timeout on a :blocking job') ) if job.params[:blocking] and job.params[:timeout] end
Returns a job queue instance.
(made it into a method for easy override)
# File lib/rufus/sc/scheduler.rb, line 336 def get_queue(type, opts) q = if type == :cron opts[:cron_job_queue] || Rufus::Scheduler::CronJobQueue.new else opts[:job_queue] || Rufus::Scheduler::JobQueue.new end q.scheduler = self if q.respond_to?(:scheduler=) q end
The method that does the “wake up and trigger any job that should get triggered.
# File lib/rufus/sc/scheduler.rb, line 366 def step @cron_jobs.trigger_matching_jobs @jobs.trigger_matching_jobs end
The default, plain, implementation. If ‘blocking’ is true, will simply call the block and return when the block is done. Else, it will call the block in a dedicated thread.
TODO : clarify, the blocking here blocks the whole scheduler, while EmScheduler blocking triggers for the next tick. Not the same thing …
# File lib/rufus/sc/scheduler.rb, line 408 def trigger_job(params, &block) if params[:blocking] block.call elsif m = params[:mutex] m = (@mutexes[m.to_s] ||= Mutex.new) unless m.is_a?(Mutex) Thread.new { m.synchronize { block.call } } else Thread.new { block.call } end end