Module: Daemon::Messaging

Defined in:
lib/onapp/engine/messaging.rb,
lib/onapp/engine/messaging/message.rb,
lib/onapp/engine/messaging/mailbox.rb,
lib/onapp/engine/messaging/connection_manager.rb

Overview

Utilizes RabbitMQ queues/mailboxes

Defined Under Namespace

Classes: ConnectionManager, Mailbox, Message

Constant Summary

DEFAULT_PREFETCH =
20
MAX_PRIORITY =
5
DEFAULT_CHANNEL_ID =
1

Class Attribute Summary (collapse)

Class Method Summary (collapse)

Class Attribute Details

+ (Object) connection

Deprecated.
TODO:

should be removed, because each process should have it's own connection

Keeps rabbitmq connection



19
20
21
# File 'lib/onapp/engine/messaging.rb', line 19

def connection
  @connection ||= connect
end

Class Method Details

+ (Object) channel

Deprecated.

See Also:



37
38
39
# File 'lib/onapp/engine/messaging.rb', line 37

def channel
  connection.channel.prefetch(DEFAULT_PREFETCH)
end

+ (Object) connect(params = Daemon.config.rabbitmq_connection_hash)

Connects to RabbitMQ

Parameters:

  • params (Hash) (defaults to: Daemon.config.rabbitmq_connection_hash)

    ConnectionHash

See Also:



29
30
31
32
33
# File 'lib/onapp/engine/messaging.rb', line 29

def connect(params = Daemon.config.rabbitmq_connection_hash)
  ConnectionManager
    .new
    .connect(params)
end

+ (Object) default_exchange

Deprecated.

See Also:



43
44
45
# File 'lib/onapp/engine/messaging.rb', line 43

def default_exchange
  channel.default_exchange
end

+ (Daemon::Messaging::Mailbox) mailbox(queue)

Convenience method

Initializes mailbox

Returns:

See Also:



82
83
84
# File 'lib/onapp/engine/messaging.rb', line 82

def mailbox(queue)
  Mailbox.new(queue)
end

+ (AMQP::Queue) queue(name, max_priority: nil, message_ttl: nil, channel: self.channel)

Correctly initializes a queue

Parameters:

  • name (String)

    Queue Name

  • max_priority (Fixnum)

    Maximum priority for queue. No priority if nil.

  • message_ttl (Fixnum)

    How long message will sit in queue until broker deletes it. No TTL means that messages will stay there forever.

  • channel (AMQP::Channel)

    Channel, on which queue will be created

Returns:

  • (AMQP::Queue)


56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/onapp/engine/messaging.rb', line 56

def queue(name, max_priority: nil, message_ttl: nil, channel: self.channel)
  arguments = {
    'x-max-priority' => max_priority,
    'x-message-ttl' => message_ttl
  }.reject { |key, val| val.nil? }

  if name == ::AMQ::Protocol::EMPTY_STRING
    auto_delete = true
  else
    auto_delete = false
  end

  channel.queue(name,
                durable: true,
                auto_delete: auto_delete,
                exclusive: false,
                arguments: arguments)
end