Class: Daemon::Supervision::MessagesManager
- Inherits:
-
Object
- Object
- Daemon::Supervision::MessagesManager
show all
- Includes:
- Utils::Loggable
- Defined in:
- lib/onapp/engine/supervision/messages_manager.rb
Constant Summary
- COMMAND_TTL =
3000
- TASK_MAX_PRIORITY =
3
Instance Attribute Summary (collapse)
#logger
Instance Method Summary
(collapse)
#debug, #error, #fatal, #info, #lwarn
Constructor Details
- (MessagesManager) initialize(name, maximum_tasks, logger: Daemon.logger)
Returns a new instance of MessagesManager
13
14
15
16
17
18
19
20
21
22
|
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 13
def initialize(name, maximum_tasks, logger: Daemon.logger)
@name = name
@logger = logger
@facility = "#{name} messages manager"
if maximum_tasks.to_i <= 0
raise "Can't proceed with maximum_tasks #{maximum_tasks}"
end
@maximum_tasks = maximum_tasks
end
|
Instance Attribute Details
- (Object) maximum_tasks
Returns the value of attribute maximum_tasks
11
12
13
|
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 11
def maximum_tasks
@maximum_tasks
end
|
- (Object) name
Returns the value of attribute name
11
12
13
|
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 11
def name
@name
end
|
Instance Method Details
- (Object) commands_exchange_name
28
29
30
|
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 28
def commands_exchange_name
name + '_commands'
end
|
- (Object) on_command(&block)
36
37
38
|
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 36
def on_command(&block)
@on_command = block
end
|
- (Object) on_task(&block)
32
33
34
|
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 32
def on_task(&block)
@on_task = block
end
|
- (Object) publish_command(command)
88
89
90
91
|
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 88
def publish_command(command)
debug { "Publishing command #{command.inspect}" }
channel.fanout(commands_exchange_name).publish(command)
end
|
- (Object) publish_task(task)
83
84
85
86
|
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 83
def publish_task(task)
debug { "Publishing task #{task.inspect}" }
channel.default_exchange.publish(task.to_s, routing_key: task_queue_name)
end
|
- (Object) subscribe
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 44
def subscribe
return if @subscribed
debug { "Subscribing..." }
command_queue.subscribe(ack: false) do |metadata, payload|
debug { "New command #{payload.inspect}" }
if @on_command
begin
@on_command.call(payload)
rescue => ex
error { "Can't process command! #{ex.class}: #{ex.message}\n#{ex.backtrace.join("\n")}" }
end
end
end
tasks.subscribe do
debug { "New task" }
if @on_task
begin
@on_task.call
rescue => ex
error { "Can't process task! #{ex.class}: #{ex.message}\n#{ex.backtrace.join("\n")}" }
end
end
end
@subscribed = true
rescue => ex
Daemon.logger.log_exception(ex)
end
|
- (Object) task_queue_name
24
25
26
|
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 24
def task_queue_name
name + '_tasks'
end
|
- (Object) tasks
78
79
80
81
|
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 78
def tasks
@tasks ||=
Daemon::Messaging.mailbox(task_queue)
end
|
- (Object) tasks_status(&block)
40
41
42
|
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 40
def tasks_status(&block)
task_queue.status(&block)
end
|