Class: Daemon::Messaging::ConnectionManager
- Inherits:
-
Object
- Object
- Daemon::Messaging::ConnectionManager
- Includes:
- Utils::Loggable
- Defined in:
- lib/onapp/engine/messaging/connection_manager.rb
Overview
Manages amqp connection Configures heartbeat and recovery. Notifies about connection-errors.
Constant Summary
- PERIODICAL_RECONNECT_PERIOD =
Used in recovery
30- HEARTBEAT =
60- REQUIRED_VERSION =
Minimal required version Used to check current version of rabbitmq and possibly show warning
Gem::Version.new('3.5')
Instance Attribute Summary (collapse)
-
- (Object) connection
readonly
Returns the value of attribute connection.
Attributes included from Utils::Loggable
Instance Method Summary (collapse)
-
- (Object) before_recovery
Before-recovery handler.
-
- (AMQP::Session) connect(spec)
RabbitMQ connection.
-
- (ConnectionManager) initialize(logger = Daemon.logger)
constructor
A new instance of ConnectionManager.
-
- (Object) on_error(connection, connection_close)
AMQP exceptions.
-
- (Object) on_open
Validates version.
-
- (Object) on_possible_authentication_failure
Handles authentication failure.
-
- (Object) on_recovery
After-recovery handler.
-
- (Object) on_tcp_connection_failure
Handles tcp connection failure.
-
- (Object) on_tcp_connection_loss
Handles network connection interrupts.
Methods included from Utils::Loggable
#debug, #error, #fatal, #info, #lwarn
Constructor Details
- (ConnectionManager) initialize(logger = Daemon.logger)
Returns a new instance of ConnectionManager
27 28 29 30 31 |
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 27 def initialize(logger = Daemon.logger) self.logger = logger # see Daemon::Utils::Loggable @facility = 'RabbitMQ' end |
Instance Attribute Details
- (Object) connection (readonly)
Returns the value of attribute connection
24 25 26 |
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 24 def connection @connection end |
Instance Method Details
- (Object) before_recovery
Before-recovery handler
94 95 96 |
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 94 def before_recovery(*) info { "TCP connection recovered" } end |
- (AMQP::Session) connect(spec)
Returns RabbitMQ connection
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 36 def connect(spec) @connection = AMQP.connect(spec.merge( auto_recovery: true, heartbeat: HEARTBEAT, on_tcp_connection_failure: method(:on_tcp_connection_failure), on_possible_authentication_failure: method(:on_possible_authentication_failure) )) connection.logger = logger connection.on_open(&method(:on_open)) connection.on_error(&method(:on_error)) connection.on_tcp_connection_loss(&method(:on_tcp_connection_loss)) connection.on_recovery(&method(:on_recovery)) connection.before_recovery(&method(:before_recovery)) connection end |
- (Object) on_error(connection, connection_close)
AMQP exceptions
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 77 def on_error(connection, connection_close) error { <<MSG Connection-level exception: AMQP class id: #{connection_close.class_id} AMQP method id: #{connection_close.method_id} Status code: #{connection_close.reply_code} Error message: #{connection_close.reply_text} MSG } if connection_close.reply_code == 320 periodically_reconnect end end |
- (Object) on_open
Validates version
104 105 106 107 108 109 110 |
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 104 def on_open(*) if current_version < REQUIRED_VERSION lwarn { "Bad version #{current_version.to_s} < #{REQUIRED_VERSION.to_s}" } end info { "AMQP connection established" } end |
- (Object) on_possible_authentication_failure
Handles authentication failure
63 64 65 66 |
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 63 def on_possible_authentication_failure(*) error { 'looks like authentication failed' } periodically_reconnect end |
- (Object) on_recovery
After-recovery handler
99 100 101 |
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 99 def on_recovery(*) info { "AMQP connection recovered" } end |
- (Object) on_tcp_connection_failure
Handles tcp connection failure
56 57 58 59 |
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 56 def on_tcp_connection_failure(*) error { 'TCP connection failure' } periodically_reconnect end |
- (Object) on_tcp_connection_loss
Handles network connection interrupts
70 71 72 73 |
# File 'lib/onapp/engine/messaging/connection_manager.rb', line 70 def on_tcp_connection_loss(*) error { 'TCP connection loss' } periodically_reconnect end |