Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

M153: Multiple Broker Support (supporting changes for pyon PR) #2178

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions ion/core/process/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ def __init__(self):
super(TransformBase,self).__init__()
self._stats = {} # Container for statistics information

def on_start(self):
'''
Begins listening for incoming RPC calls.
'''
super(TransformBase,self).on_start()
self._rpc_server = self.container.proc_manager._create_listening_endpoint(from_name=self.id,
process=self)
self.add_endpoint(self._rpc_server)

def _stat(self):
return self._stats

Expand Down
24 changes: 13 additions & 11 deletions ion/services/coi/exchange_management_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class ExchangeManagementService(BaseExchangeManagementService):
The Exchange Management Service is the service that manages the Exchange and its associated resources, such as Exchange Spaces, Names, Points and Brokers.

"""
typemap = { 'XN_SERVICE':'service', 'XN_PROCESS':'process', 'XN_QUEUE':'queue' }

def create_exchange_space(self, exchange_space=None, org_id=''):
"""Creates an Exchange Space distributed resource from the parameter exchange_space object.

Expand Down Expand Up @@ -108,8 +110,8 @@ def delete_exchange_space(self, exchange_space_id=''):
# delete XS now
self.container.resource_registry.delete(exchange_space_id)

# call container API to delete @TODO this is clunky
xs = exchange.ExchangeSpace(self.container.ex_manager, self.container.ex_manager._priviledged_transport, exchange_space.name)
# call container API to delete
xs = self.container.ex_manager.create_xs(exchange_space.name, use_ems=False, declare=False)
self.container.ex_manager.delete_xs(xs, use_ems=False)

def find_exchange_spaces(self, filters=None):
Expand All @@ -130,8 +132,7 @@ def declare_exchange_name(self, exchange_name=None, exchange_space_id=''):
@throws BadRequest if object passed has _id or _rev attribute
"""
# get xntype and translate @TODO should we just consolidate these to be the same?
typemap = { 'XN_SERVICE':'service', 'XN_PROCESS':'process', 'XN_QUEUE':'queue' }
if not exchange_name.xn_type in typemap:
if not exchange_name.xn_type in self.typemap:
raise BadRequest("Unknown exchange name type: %s" % exchange_name.xn_type)

xns, assocs = self.container.resource_registry.find_objects(subject=exchange_space_id, predicate=PRED.hasExchangeName, id_only=False)
Expand All @@ -141,7 +142,7 @@ def declare_exchange_name(self, exchange_name=None, exchange_space_id=''):
exchange_name_id = xn._id


xntype = typemap[exchange_name.xn_type]
xntype = self.typemap[exchange_name.xn_type]

exchange_space = self.read_exchange_space(exchange_space_id)
if not exchange_name_id:
Expand All @@ -150,7 +151,7 @@ def declare_exchange_name(self, exchange_name=None, exchange_space_id=''):
aid = self.container.resource_registry.create_association(exchange_space_id, PRED.hasExchangeName, exchange_name_id)

# call container API
xs = exchange.ExchangeSpace(self.container.ex_manager, self.container.ex_manager._priviledged_transport, exchange_space.name)
xs = self.container.ex_manager.create_xs(exchange_space.name, use_ems=False)
self.container.ex_manager._create_xn(xntype, exchange_name.name, xs, use_ems=False)

return exchange_name_id #QUestion - is this the correct canonical name?
Expand Down Expand Up @@ -187,8 +188,9 @@ def undeclare_exchange_name(self, canonical_name='', exchange_space_id=''):
self.container.resource_registry.delete(exchange_name_id)

# call container API
xs = exchange.ExchangeSpace(self.container.ex_manager, self.container.ex_manager._priviledged_transport, exchange_space.name)
xn = exchange.ExchangeName(self.container.ex_manager, self.container.ex_manager._priviledged_transport, exchange_name.name, xs) # type doesn't matter here
xntype = self.typemap[exchange_name.xn_type]
xs = self.container.ex_manager.create_xs(exchange_space.name, use_ems=False, declare=False)
xn = self.container.ex_manager._create_xn(xntype, exchange_name.name, xs, use_ems=False, declare=False)
self.container.ex_manager.delete_xn(xn, use_ems=False)

def find_exchange_names(self, filters=None):
Expand Down Expand Up @@ -222,7 +224,7 @@ def create_exchange_point(self, exchange_point=None, exchange_space_id=''):
self.container.resource_registry.create_association(exchange_space_id, PRED.hasExchangePoint, exchange_point_id)

# call container API
xs = exchange.ExchangeSpace(self.container.ex_manager, self.container.ex_manager._priviledged_transport, exchange_space.name)
xs = self.container.ex_manager.create_xs(exchange_space.name, use_ems=False)
self.container.ex_manager.create_xp(exchange_point.name, xs, xptype=exchange_point.topology_type, use_ems=False)

return exchange_point_id
Expand Down Expand Up @@ -276,8 +278,8 @@ def delete_exchange_point(self, exchange_point_id=''):
self.container.resource_registry.delete(exchange_point_id)

# call container API
xs = exchange.ExchangeSpace(self.container.ex_manager, self.container.ex_manager._priviledged_transport, exchange_space.name)
xp = exchange.ExchangePoint(self.container.ex_manager, self.container.ex_manager._priviledged_transport, exchange_point.name, xs, 'ttree')
xs = self.container.ex_manager.create_xs(exchange_space.name, use_ems=False, declare=False)
xp = self.container.ex_manager.create_xp(exchange_point.name, xs, xptype=exchange_point.topology_type, use_ems=False, declare=False)
self.container.ex_manager.delete_xp(xp, use_ems=False)

def find_exchange_points(self, filters=None):
Expand Down