Class: Daemon::Supervision::MessagesManager

Inherits:
Object
  • Object
show all
Includes:
Utils::Loggable
Defined in:
lib/onapp/engine/supervision/messages_manager.rb

Constant Summary

COMMAND_TTL =
3000
TASK_MAX_PRIORITY =
3

Instance Attribute Summary (collapse)

Attributes included from Utils::Loggable

#logger

Instance Method Summary (collapse)

Methods included from Utils::Loggable

#debug, #error, #fatal, #info, #lwarn

Constructor Details

- (MessagesManager) initialize(name, maximum_tasks, logger: Daemon.logger)

Returns a new instance of MessagesManager



13
14
15
16
17
18
19
20
21
22
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 13

def initialize(name, maximum_tasks, logger: Daemon.logger)
  @name = name
  @logger = logger
  @facility = "#{name} messages manager"

  if maximum_tasks.to_i <= 0
    raise "Can't proceed with maximum_tasks #{maximum_tasks}"
  end
  @maximum_tasks = maximum_tasks
end

Instance Attribute Details

- (Object) maximum_tasks (readonly)

Returns the value of attribute maximum_tasks



11
12
13
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 11

def maximum_tasks
  @maximum_tasks
end

- (Object) name (readonly)

Returns the value of attribute name



11
12
13
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 11

def name
  @name
end

Instance Method Details

- (Object) commands_exchange_name



28
29
30
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 28

def commands_exchange_name
  name + '_commands'
end

- (Object) on_command(&block)



36
37
38
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 36

def on_command(&block)
  @on_command = block
end

- (Object) on_task(&block)



32
33
34
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 32

def on_task(&block)
  @on_task = block
end

- (Object) publish_command(command)



88
89
90
91
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 88

def publish_command(command)
  debug { "Publishing command #{command.inspect}" }
  channel.fanout(commands_exchange_name).publish(command)
end

- (Object) publish_task(task)



83
84
85
86
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 83

def publish_task(task)
  debug { "Publishing task #{task.inspect}" }
  channel.default_exchange.publish(task.to_s, routing_key: task_queue_name)
end

- (Object) subscribe



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 44

def subscribe
  return if @subscribed

  debug { "Subscribing..." }

  command_queue.subscribe(ack: false) do |, payload|
    debug { "New command #{payload.inspect}" }

    if @on_command
      begin
        @on_command.call(payload)
      rescue => ex
        error { "Can't process command! #{ex.class}: #{ex.message}\n#{ex.backtrace.join("\n")}" }
      end
    end
  end

  tasks.subscribe do
    debug { "New task" }

    if @on_task
      begin
        @on_task.call
      rescue => ex
        error { "Can't process task! #{ex.class}: #{ex.message}\n#{ex.backtrace.join("\n")}" }
      end
    end
  end

  @subscribed = true
rescue => ex
  Daemon.logger.log_exception(ex)
end

- (Object) task_queue_name



24
25
26
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 24

def task_queue_name
  name + '_tasks'
end

- (Object) tasks



78
79
80
81
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 78

def tasks
  @tasks ||=
    Daemon::Messaging.mailbox(task_queue)
end

- (Object) tasks_status(&block)



40
41
42
# File 'lib/onapp/engine/supervision/messages_manager.rb', line 40

def tasks_status(&block)
  task_queue.status(&block)
end