Class: Daemon::Messaging::ConnectionManager

Inherits:
Object
  • Object
show all
Includes:
Utils::Loggable
Defined in:
lib/onapp/engine/messaging/connection_manager.rb

Overview

Manages amqp connection Configures heartbeat and recovery. Notifies about connection-errors.

Constant Summary

PERIODICAL_RECONNECT_PERIOD =

Used in recovery

30
HEARTBEAT =
60
REQUIRED_VERSION =

Minimal required version Used to check current version of rabbitmq and possibly show warning

Gem::Version.new('3.5')

Instance Attribute Summary (collapse)

Attributes included from Utils::Loggable

#logger

Instance Method Summary (collapse)

Methods included from Utils::Loggable

#debug, #error, #fatal, #info, #lwarn

Constructor Details

- (ConnectionManager) initialize(logger = Daemon.logger)

Returns a new instance of ConnectionManager

Parameters:



27
28
29
30
31
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 27

def initialize(logger = Daemon.logger)
  self.logger = logger
  # see Daemon::Utils::Loggable
  @facility = 'RabbitMQ'
end

Instance Attribute Details

- (Object) connection (readonly)

Returns the value of attribute connection



24
25
26
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 24

def connection
  @connection
end

Instance Method Details

- (Object) before_recovery

Before-recovery handler



94
95
96
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 94

def before_recovery(*)
  info { "TCP connection recovered" }
end

- (AMQP::Session) connect(spec)

Returns RabbitMQ connection

Parameters:

Returns:

  • (AMQP::Session)

    RabbitMQ connection



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 36

def connect(spec)
  @connection = AMQP.connect(spec.merge(
    auto_recovery:  true,
    heartbeat:      HEARTBEAT,
    on_tcp_connection_failure:          method(:on_tcp_connection_failure),
    on_possible_authentication_failure: method(:on_possible_authentication_failure)
  ))

  connection.logger = logger
  connection.on_open(&method(:on_open))
  connection.on_error(&method(:on_error))
  connection.on_tcp_connection_loss(&method(:on_tcp_connection_loss))
  connection.on_recovery(&method(:on_recovery))
  connection.before_recovery(&method(:before_recovery))

  connection
end

- (Object) on_error(connection, connection_close)

AMQP exceptions



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 77

def on_error(connection, connection_close)
  error {
    <<MSG
Connection-level exception:
  AMQP class id:  #{connection_close.class_id}
  AMQP method id: #{connection_close.method_id}
  Status code:    #{connection_close.reply_code}
  Error message:  #{connection_close.reply_text}
MSG
  }

  if connection_close.reply_code == 320
    periodically_reconnect
  end
end

- (Object) on_open

Validates version



104
105
106
107
108
109
110
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 104

def on_open(*)
  if current_version < REQUIRED_VERSION
    lwarn { "Bad version #{current_version.to_s} < #{REQUIRED_VERSION.to_s}" }
  end

  info { "AMQP connection established" }
end

- (Object) on_possible_authentication_failure

Handles authentication failure



63
64
65
66
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 63

def on_possible_authentication_failure(*)
  error { 'looks like authentication failed' }
  periodically_reconnect
end

- (Object) on_recovery

After-recovery handler



99
100
101
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 99

def on_recovery(*)
  info { "AMQP connection recovered" }
end

- (Object) on_tcp_connection_failure

Handles tcp connection failure



56
57
58
59
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 56

def on_tcp_connection_failure(*)
  error { 'TCP connection failure' }
  periodically_reconnect
end

- (Object) on_tcp_connection_loss

Handles network connection interrupts



70
71
72
73
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 70

def on_tcp_connection_loss(*)
  error { 'TCP connection loss' }
  periodically_reconnect
end