Class: Daemon::Messaging::Mailbox

Inherits:
Object
  • Object
show all
Defined in:
lib/onapp/engine/messaging/mailbox.rb

Overview

Subscribes to queue, and collects unacked messages until they will be popped and acknowledged

Instance Method Summary (collapse)

Constructor Details

- (Mailbox) initialize(queue)

Returns a new instance of Mailbox

Parameters:

  • queue (AMQP::Queue)

    Queue to subscribe



15
16
17
18
19
20
# File 'lib/onapp/engine/messaging/mailbox.rb', line 15

def initialize(queue)
  @queue = queue
  @mailbox = Daemon::Utils::SharedValue.new(PQueue.new)
  @callbacks = Daemon::Utils::SharedValue.new([])
  subscribe_to_queue
end

Instance Method Details

- (Boolean) any?

Returns:

  • (Boolean)


40
41
42
# File 'lib/onapp/engine/messaging/mailbox.rb', line 40

def any?
  size > 0
end

- (Object) pop

Removes message from PQueue



23
24
25
# File 'lib/onapp/engine/messaging/mailbox.rb', line 23

def pop
  @mailbox.acquire(&:pop)
end

- (Object) size

Messages count



36
37
38
# File 'lib/onapp/engine/messaging/mailbox.rb', line 36

def size
  @mailbox.acquire(&:size)
end

- (Object) subscribe(&block)

Add subscriber block. The block will be called with no arguments when new message arrives



29
30
31
32
33
# File 'lib/onapp/engine/messaging/mailbox.rb', line 29

def subscribe(&block)
  if block
    @callbacks.acquire { |c| c << block }
  end
end