Class: Daemon::Supervision::Registry
- Inherits:
-
Utils::AutoDeletedHash
show all
- Includes:
- Utils::Loggable
- Defined in:
- lib/onapp/engine/supervision/registry.rb
Constant Summary
- SEPARATOR =
'.'.freeze
- KEY =
"workers#{SEPARATOR}%s#{SEPARATOR}%s".freeze
- LOCK_KEY =
"workers-lock.%s".freeze
- LOCK_TIMEOUT =
5
- LOCK_EXPIRATION =
30
Utils::AutoDeletedHash::EXPIRATION_TIME, Utils::AutoDeletedHash::REFRESH_TIME
Instance Attribute Summary (collapse)
Class Method Summary
(collapse)
Instance Method Summary
(collapse)
#debug, #error, #fatal, #info, #lwarn
#refresh, #size
Constructor Details
- (Registry) initialize(name, id: self.class.id, auto_delete: true, &allocator)
Returns a new instance of Registry
53
54
55
56
57
58
59
60
|
# File 'lib/onapp/engine/supervision/registry.rb', line 53
def initialize(name, id: self.class.id, auto_delete: true, &allocator)
@name = self.class.normalize_key(name)
@allocator = allocator || proc{}
@id = id
@facility = "registry #{name}-#{id}"
@auto_delete = auto_delete
super(key, auto_delete: auto_delete)
end
|
Instance Attribute Details
- (Object) auto_delete
Returns the value of attribute auto_delete
20
21
22
|
# File 'lib/onapp/engine/supervision/registry.rb', line 20
def auto_delete
@auto_delete
end
|
- (Object) id
Returns the value of attribute id
20
21
22
|
# File 'lib/onapp/engine/supervision/registry.rb', line 20
def id
@id
end
|
- (Object) name
Returns the value of attribute name
20
21
22
|
# File 'lib/onapp/engine/supervision/registry.rb', line 20
def name
@name
end
|
Class Method Details
+ (Object) clear
23
24
25
|
# File 'lib/onapp/engine/supervision/registry.rb', line 23
def clear
workers.each(&:clear)
end
|
+ (Object) id
44
45
46
|
# File 'lib/onapp/engine/supervision/registry.rb', line 44
def id
@id ||= normalize_key(Daemon::Utils::Id.generate('supervision-registry'))
end
|
+ (Object) normalize_key(key)
48
49
50
|
# File 'lib/onapp/engine/supervision/registry.rb', line 48
def normalize_key(key)
key.gsub(SEPARATOR, '')
end
|
+ (Object) redis
27
28
29
|
# File 'lib/onapp/engine/supervision/registry.rb', line 27
def redis
Daemon.redis
end
|
+ (Object) registry_ids
31
32
33
34
35
|
# File 'lib/onapp/engine/supervision/registry.rb', line 31
def registry_ids
redis.scan_each(match: KEY % %w(* *)).map do |key|
key.split(SEPARATOR)[1]
end
end
|
+ (Object) workers(registry_id = '*', worker_name = '*')
37
38
39
40
41
42
|
# File 'lib/onapp/engine/supervision/registry.rb', line 37
def workers(registry_id = '*', worker_name = '*')
redis.scan_each(match: KEY % [registry_id, worker_name]).map do |key|
_, id, name = key.split(SEPARATOR)
new(name, id: id, auto_delete: false)
end
end
|
Instance Method Details
- (Object) allocate_worker
79
80
81
|
# File 'lib/onapp/engine/supervision/registry.rb', line 79
def allocate_worker
@allocator.call
end
|
- (Object) key
62
63
64
|
# File 'lib/onapp/engine/supervision/registry.rb', line 62
def key
@key ||= KEY % [id, name]
end
|
- (Object) lock(&block)
118
119
120
|
# File 'lib/onapp/engine/supervision/registry.rb', line 118
def lock(&block)
redis_lock.lock(&block)
end
|
- (Object) logger
66
67
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/onapp/engine/supervision/registry.rb', line 66
def logger
@logger ||=
begin
worker = allocate_worker
if worker.respond_to?(:logger)
worker.logger
else
Daemon.logger
end
end
end
|
- (Object) redis_lock
122
123
124
125
126
127
|
# File 'lib/onapp/engine/supervision/registry.rb', line 122
def redis_lock
@redis_lock ||=
::Redis::Lock.new(LOCK_KEY % name,
timeout: LOCK_TIMEOUT,
expiration: LOCK_EXPIRATION)
end
|
- (Object) register(task_id = 0, task_payload = Time.now.to_s)
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
# File 'lib/onapp/engine/supervision/registry.rb', line 87
def register(task_id = 0, task_payload = Time.now.to_s)
lock do
if registered?(task_id)
false
else
store(task_id, task_payload)
thread_registry[:task_id] = task_id
thread_registry[:task_payload] = task_payload
allocate_worker
end
end
rescue => ex
error { "Can't register worker for task #{task_id.inspect}. #{ex.class}: #{ex.message}\n#{ex.backtrace.join("\n")}" }
false
end
|
- (Boolean) registered?(task_id)
83
84
85
|
# File 'lib/onapp/engine/supervision/registry.rb', line 83
def registered?(task_id)
registries.any? { |registry| registry.has_key?(task_id) }
end
|
- (Object) registries
131
132
133
|
# File 'lib/onapp/engine/supervision/registry.rb', line 131
def registries
self.class.workers('*', name)
end
|
- (Object) thread_registry
114
115
116
|
# File 'lib/onapp/engine/supervision/registry.rb', line 114
def thread_registry
@thread_registry ||= Daemon::Utils::PerThreadRegistry.new
end
|
- (Object) total_count
135
136
137
138
139
|
# File 'lib/onapp/engine/supervision/registry.rb', line 135
def total_count
lock do
registries.map(&:size).inject(0, :+)
end
end
|
- (Object) unregister(task_id = thread_registry[:task_id])
103
104
105
106
107
108
109
110
111
112
|
# File 'lib/onapp/engine/supervision/registry.rb', line 103
def unregister(task_id = thread_registry[:task_id])
lock do
registries.each { |w| w.delete(task_id) }
thread_registry.delete(:task_id)
thread_registry.delete(:task_payload)
end
rescue => ex
error { "Can't unregister worker for task #{task_id.inspect}. #{ex.class}: #{ex.message}\n#{ex.backtrace.join("\n")}" }
end
|