Class: Daemon::Supervision::Registry

Inherits:
Utils::AutoDeletedHash show all
Includes:
Utils::Loggable
Defined in:
lib/onapp/engine/supervision/registry.rb

Constant Summary

SEPARATOR =
'.'.freeze
KEY =
"workers#{SEPARATOR}%s#{SEPARATOR}%s".freeze
LOCK_KEY =
"workers-lock.%s".freeze
LOCK_TIMEOUT =
5
LOCK_EXPIRATION =
30

Constants inherited from Utils::AutoDeletedHash

Utils::AutoDeletedHash::EXPIRATION_TIME, Utils::AutoDeletedHash::REFRESH_TIME

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Methods included from Utils::Loggable

#debug, #error, #fatal, #info, #lwarn

Methods inherited from Utils::AutoDeletedHash

#refresh, #size

Constructor Details

- (Registry) initialize(name, id: self.class.id, auto_delete: true, &allocator)

Returns a new instance of Registry



53
54
55
56
57
58
59
60
# File 'lib/onapp/engine/supervision/registry.rb', line 53

def initialize(name, id: self.class.id, auto_delete: true, &allocator)
  @name = self.class.normalize_key(name)
  @allocator = allocator || proc{}
  @id = id
  @facility = "registry #{name}-#{id}"
  @auto_delete = auto_delete
  super(key, auto_delete: auto_delete)
end

Instance Attribute Details

- (Object) auto_delete (readonly)

Returns the value of attribute auto_delete



20
21
22
# File 'lib/onapp/engine/supervision/registry.rb', line 20

def auto_delete
  @auto_delete
end

- (Object) id (readonly)

Returns the value of attribute id



20
21
22
# File 'lib/onapp/engine/supervision/registry.rb', line 20

def id
  @id
end

- (Object) name (readonly)

Returns the value of attribute name



20
21
22
# File 'lib/onapp/engine/supervision/registry.rb', line 20

def name
  @name
end

Class Method Details

+ (Object) clear



23
24
25
# File 'lib/onapp/engine/supervision/registry.rb', line 23

def clear
  workers.each(&:clear)
end

+ (Object) id



44
45
46
# File 'lib/onapp/engine/supervision/registry.rb', line 44

def id
  @id ||= normalize_key(Daemon::Utils::Id.generate('supervision-registry'))
end

+ (Object) normalize_key(key)



48
49
50
# File 'lib/onapp/engine/supervision/registry.rb', line 48

def normalize_key(key)
  key.gsub(SEPARATOR, '')
end

+ (Object) redis



27
28
29
# File 'lib/onapp/engine/supervision/registry.rb', line 27

def redis
  Daemon.redis
end

+ (Object) registry_ids



31
32
33
34
35
# File 'lib/onapp/engine/supervision/registry.rb', line 31

def registry_ids
  redis.scan_each(match: KEY % %w(* *)).map do |key|
    key.split(SEPARATOR)[1]
  end
end

+ (Object) workers(registry_id = '*', worker_name = '*')



37
38
39
40
41
42
# File 'lib/onapp/engine/supervision/registry.rb', line 37

def workers(registry_id = '*', worker_name = '*')
  redis.scan_each(match: KEY % [registry_id, worker_name]).map do |key|
    _, id, name = key.split(SEPARATOR)
    new(name, id: id, auto_delete: false)
  end
end

Instance Method Details

- (Object) allocate_worker



79
80
81
# File 'lib/onapp/engine/supervision/registry.rb', line 79

def allocate_worker
  @allocator.call
end

- (Object) key



62
63
64
# File 'lib/onapp/engine/supervision/registry.rb', line 62

def key
  @key ||= KEY % [id, name]
end

- (Object) lock(&block)



118
119
120
# File 'lib/onapp/engine/supervision/registry.rb', line 118

def lock(&block)
  redis_lock.lock(&block)
end

- (Object) logger



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/onapp/engine/supervision/registry.rb', line 66

def logger
  @logger ||=
    begin
      worker = allocate_worker

      if worker.respond_to?(:logger)
        worker.logger
      else
        Daemon.logger
      end
    end
end

- (Object) redis_lock



122
123
124
125
126
127
# File 'lib/onapp/engine/supervision/registry.rb', line 122

def redis_lock
  @redis_lock ||=
    ::Redis::Lock.new(LOCK_KEY % name,
                      timeout: LOCK_TIMEOUT,
                      expiration: LOCK_EXPIRATION)
end

- (Object) register(task_id = 0, task_payload = Time.now.to_s)



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/onapp/engine/supervision/registry.rb', line 87

def register(task_id = 0, task_payload = Time.now.to_s)
  lock do
    if registered?(task_id)
      false
    else
      store(task_id, task_payload)
      thread_registry[:task_id] = task_id
      thread_registry[:task_payload] = task_payload
      allocate_worker
    end
  end
rescue => ex
  error { "Can't register worker for task #{task_id.inspect}. #{ex.class}: #{ex.message}\n#{ex.backtrace.join("\n")}" }
  false
end

- (Boolean) registered?(task_id)

Returns:

  • (Boolean)


83
84
85
# File 'lib/onapp/engine/supervision/registry.rb', line 83

def registered?(task_id)
  registries.any? { |registry| registry.has_key?(task_id) }
end

- (Object) registries



131
132
133
# File 'lib/onapp/engine/supervision/registry.rb', line 131

def registries
  self.class.workers('*', name)
end

- (Object) thread_registry



114
115
116
# File 'lib/onapp/engine/supervision/registry.rb', line 114

def thread_registry
  @thread_registry ||= Daemon::Utils::PerThreadRegistry.new
end

- (Object) total_count



135
136
137
138
139
# File 'lib/onapp/engine/supervision/registry.rb', line 135

def total_count
  lock do
    registries.map(&:size).inject(0, :+)
  end
end

- (Object) unregister(task_id = thread_registry[:task_id])



103
104
105
106
107
108
109
110
111
112
# File 'lib/onapp/engine/supervision/registry.rb', line 103

def unregister(task_id = thread_registry[:task_id])
  lock do
    registries.each { |w| w.delete(task_id) }
    thread_registry.delete(:task_id)
    thread_registry.delete(:task_payload)
  end

rescue => ex
  error { "Can't unregister worker for task #{task_id.inspect}. #{ex.class}: #{ex.message}\n#{ex.backtrace.join("\n")}" }
end