Class: Daemon::Supervision::ProcessSupervisor

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/onapp/engine/supervision/process_supervisor.rb

Constant Summary

COMMAND_MESSAGE_TTL =
3000
TASK_MAX_PRIORITY =
3

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (ProcessSupervisor) initialize(worker, level = nil)

Returns a new instance of ProcessSupervisor



50
51
52
53
54
55
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 50

def initialize(worker, level = nil)
  @maximum ||= 1
  @level ||= level
  @conn_mutex = Mutex.new
  @worker = worker
end

Instance Attribute Details

- (Object) level (readonly)

Returns the value of attribute level



27
28
29
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 27

def level
  @level
end

- (Object) maximum (readonly)

Returns the value of attribute maximum



27
28
29
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 27

def maximum
  @maximum
end

Class Method Details

+ (Object) formatted_name



30
31
32
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 30

def formatted_name
  name.demodulize.underscore
end

+ (Object) workers



34
35
36
37
38
39
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 34

def workers
  Registry
    .workers('*', formatted_name)
    .map { |registry| registry.all.tap { |hash| hash.delete(registry.key) } }
    .inject({}, :merge)
end

Instance Method Details

- (Boolean) allow_new_process?

Returns:

  • (Boolean)


75
76
77
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 75

def allow_new_process?
  registry.total_count < maximum
end

- (Object) logger



61
62
63
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 61

def logger
  @logger ||= registry.logger
end

- (Object) messages_manager



93
94
95
96
97
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 93

def messages_manager
  @messages_manager ||=
    Daemon::Utils::ProcessCache.new(proc{ MessagesManager.new(name, maximum) })
  @messages_manager.access
end

- (Object) name



42
43
44
45
46
47
48
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 42

def name
  @name ||= begin
              name = self.class.formatted_name
              name += "_level#{level}" if level
              name
            end
end

- (Object) on_command(command)



85
86
87
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 85

def on_command(command)
  perform if command == Supervision::PROCEED
end

- (Object) on_task



89
90
91
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 89

def on_task
  perform
end

- (Object) perform



65
66
67
68
69
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 65

def perform
  allow_new_process? || return

  EM.defer(&method(:operation))
end

- (Object) registry



57
58
59
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 57

def registry
  @registry ||= Registry.new(name) { @worker.new(level: level) }
end

- (Object) subscribe



79
80
81
82
83
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 79

def subscribe
  messages_manager.on_command(&method(:on_command))
  messages_manager.on_task(&method(:on_task))
  messages_manager.subscribe
end

- (Object) worker_name



71
72
73
# File 'lib/onapp/engine/supervision/process_supervisor.rb', line 71

def worker_name
  @worker_name ||= name.singularize.camelize
end