added SNMPv3 support

This change introduces SNMPv3 support along with many encryption
protocols used in SNMP.

Change-Id: I891a8526d3896b6194d0c812b89c9f0130301102
Task: 12072
Story: 1751164
This commit is contained in:
Ilya Etingof 2018-03-27 19:26:06 +02:00
parent ae398e612c
commit 765cf1be19
10 changed files with 437 additions and 101 deletions

View File

@ -1,13 +1,34 @@
[global] [global]
libvirt_uri=test:///default libvirt_uri=test:///default
debug_snmp=no debug_snmp=no
[my_pdu] [my_pdu]
listen_address=127.0.0.1 listen_address=127.0.0.1
listen_port=9998 listen_port=9998
snmp_versions = 1,2c
# SNMPv1 & SNMPv2c
community=public community=public
# Managed SNMP objects
ports=5:test ports=5:test
[my_second_pdu] [my_second_pdu]
listen_address=127.0.0.1 listen_address=127.0.0.1
listen_port=9997 listen_port=9997
community=public
snmp_versions = 3
# SNMPv3
engine_id=0x80010203040506070809
context_engine_id=0x80010203040506070809
context_name=
user=openstack
auth_key=openstack
auth_protocol=MD5
priv_key=openstack
priv_protocol=DES
# Managed SNMP objects
ports=2:test ports=2:test

View File

@ -36,35 +36,79 @@ def main():
sys.stderr.write(MISSING_CONFIG_MESSAGE) sys.stderr.write(MISSING_CONFIG_MESSAGE)
return 1 return 1
else: else:
config = configparser.RawConfigParser({'debug_snmp': 'no'}) config = configparser.RawConfigParser(
{'debug_snmp': 'no',
'snmp_versions': '1,2c',
# SNMPv2c
'community': None,
# SNMPv3
'engine_id': None,
'context_engine_id': None,
'context_name': '',
'user': None,
'auth_key': None,
'auth_protocol': None,
'priv_key': None,
'priv_protocol': None}
)
config.read(config_file) config.read(config_file)
driver = get_driver_from_config(config) driver = get_driver_from_config(config)
mapping = get_mapping_for_config(config) mapping = get_mapping_for_config(config)
outlet_default_state = get_default_state_from_config(config) outlet_default_state = get_default_state_from_config(config)
debug_snmp = config.get('global', 'debug_snmp') debug_snmp = config.get('global', 'debug_snmp')
core = virtualpdu.core.Core(driver=driver, mapping=mapping, store={}, core = virtualpdu.core.Core(driver=driver, mapping=mapping, store={},
default_state=outlet_default_state) default_state=outlet_default_state)
pdu_threads = [] pdu_threads = []
for pdu in [s for s in config.sections() if s != 'global']: for pdu in [s for s in config.sections() if s != 'global']:
listen_address = config.get(pdu, 'listen_address')
port = int(config.get(pdu, 'listen_port'))
community = config.get(pdu, 'community')
apc_pdu = apc_rackpdu.APCRackPDU(pdu, core) apc_pdu = apc_rackpdu.APCRackPDU(pdu, core)
pdu_threads.append( listen_address = config.get(pdu, 'listen_address')
pysnmp_handler.SNMPPDUHarness( listen_port = int(config.get(pdu, 'listen_port'))
apc_pdu,
listen_address, snmp_versions = config.get(pdu, 'snmp_versions')
port, snmp_versions = [x.strip() for x in snmp_versions.split(',')]
community,
debug_snmp=debug_snmp in ('yes', 'true', '1') # SNMPv1/v2c options
) community = config.get(pdu, 'community')
# SNMPv3 options
engine_id = config.get(pdu, 'engine_id')
if engine_id and engine_id.startswith('0x'):
engine_id = engine_id[2:]
context_engine_id = config.get(pdu, 'context_engine_id')
if context_engine_id and context_engine_id.startswith('0x'):
context_engine_id = context_engine_id[2:]
context_name = config.get(pdu, 'context_name')
user = config.get(pdu, 'user')
auth_key = config.get(pdu, 'auth_key')
auth_protocol = config.get(pdu, 'auth_protocol')
priv_key = config.get(pdu, 'priv_key')
priv_protocol = config.get(pdu, 'priv_protocol')
snmp_harness = pysnmp_handler.SNMPPDUHarness(
apc_pdu,
listen_address,
listen_port,
snmp_versions=snmp_versions,
community=community,
engine_id=engine_id,
context_engine_id=context_engine_id,
context_name=context_name,
user=user,
auth_key=auth_key,
auth_protocol=auth_protocol,
priv_key=priv_key,
priv_protocol=priv_protocol,
debug_snmp=debug_snmp in ('yes', 'true', '1')
) )
pdu_threads.append(snmp_harness)
for t in pdu_threads: for t in pdu_threads:
t.start() t.start()

View File

@ -28,62 +28,106 @@ from pysnmp.proto.api import v2c
from virtualpdu.pdu import TraversableOidMapping from virtualpdu.pdu import TraversableOidMapping
auth_protocols = {
'MD5': config.usmHMACMD5AuthProtocol,
'SHA': config.usmHMACSHAAuthProtocol,
'NONE': config.usmNoAuthProtocol
}
# Some auth protocols may not be available in older pysnmp versions
try:
auth_protocols['SHA224'] = config.usmHMAC128SHA224AuthProtocol
auth_protocols['SHA256'] = config.usmHMAC192SHA256AuthProtocol
auth_protocols['SHA384'] = config.usmHMAC256SHA384AuthProtocol
auth_protocols['SHA512'] = config.usmHMAC384SHA512AuthProtocol
except AttributeError:
pass
priv_protocols = {
'DES': config.usmDESPrivProtocol,
'3DES': config.usm3DESEDEPrivProtocol,
'AES': config.usmAesCfb128Protocol,
'AES128': config.usmAesCfb128Protocol,
'AES192': config.usmAesCfb192Protocol,
'AES256': config.usmAesCfb256Protocol,
'NONE': config.usmNoPrivProtocol
}
# Some privacy protocols may not be available in older pysnmp versions
try:
priv_protocols['AES192BLMT'] = config.usmAesBlumenthalCfb192Protocol
priv_protocols['AES256BLMT'] = config.usmAesBlumenthalCfb256Protocol
except AttributeError:
pass
class GetCommandResponder(cmdrsp.GetCommandResponder): class GetCommandResponder(cmdrsp.GetCommandResponder):
def __init__(self, snmpEngine, snmpContext, power_unit): def __init__(self, snmpEngine, snmpContext, context_name, power_unit):
super(GetCommandResponder, self).__init__(snmpEngine, snmpContext) super(GetCommandResponder, self).__init__(snmpEngine, snmpContext)
self.__context_name = v2c.OctetString(context_name)
self.__power_unit = power_unit self.__power_unit = power_unit
def handleMgmtOperation(self, snmpEngine, stateReference, def handleMgmtOperation(self, snmpEngine, stateReference,
contextName, req_pdu, acInfo): contextName, req_pdu, acInfo):
var_binds = [] if self.__context_name == contextName:
for oid, val in v2c.apiPDU.getVarBinds(req_pdu): var_binds = []
var_binds.append(
(oid, (self.__power_unit.oid_mapping[oid].value
if oid in self.__power_unit.oid_mapping
else v2c.NoSuchInstance('')))
)
self.sendRsp(snmpEngine, stateReference, 0, 0, var_binds) for oid, val in v2c.apiPDU.getVarBinds(req_pdu):
var_binds.append(
(oid, (self.__power_unit.oid_mapping[oid].value
if oid in self.__power_unit.oid_mapping
else v2c.NoSuchInstance('')))
)
self.sendRsp(snmpEngine, stateReference, 0, 0, var_binds)
self.releaseStateInformation(stateReference) self.releaseStateInformation(stateReference)
class NextCommandResponder(cmdrsp.NextCommandResponder): class NextCommandResponder(cmdrsp.NextCommandResponder):
def __init__(self, snmpEngine, snmpContext, power_unit): def __init__(self, snmpEngine, snmpContext, context_name, power_unit):
super(NextCommandResponder, self).__init__(snmpEngine, snmpContext) super(NextCommandResponder, self).__init__(snmpEngine, snmpContext)
self.__context_name = v2c.OctetString(context_name)
self.__power_unit = power_unit self.__power_unit = power_unit
def handleMgmtOperation(self, snmpEngine, stateReference, def handleMgmtOperation(self, snmpEngine, stateReference,
contextName, req_pdu, acInfo): contextName, req_pdu, acInfo):
oid_map = TraversableOidMapping(self.__power_unit.oid_mapping) if self.__context_name == contextName:
var_binds = [] oid_map = TraversableOidMapping(self.__power_unit.oid_mapping)
for oid, val in v2c.apiPDU.getVarBinds(req_pdu): var_binds = []
try: for oid, val in v2c.apiPDU.getVarBinds(req_pdu):
oid = oid_map.next(to=oid)
val = self.__power_unit.oid_mapping[oid].value
except (KeyError, IndexError): try:
val = v2c.NoSuchInstance('') oid = oid_map.next(to=oid)
val = self.__power_unit.oid_mapping[oid].value
var_binds.append((oid, val)) except (KeyError, IndexError):
val = v2c.NoSuchInstance('')
self.sendRsp(snmpEngine, stateReference, 0, 0, var_binds) var_binds.append((oid, val))
self.sendRsp(snmpEngine, stateReference, 0, 0, var_binds)
self.releaseStateInformation(stateReference) self.releaseStateInformation(stateReference)
class SetCommandResponder(cmdrsp.SetCommandResponder): class SetCommandResponder(cmdrsp.SetCommandResponder):
def __init__(self, snmpEngine, snmpContext, power_unit): def __init__(self, snmpEngine, snmpContext, context_name, power_unit):
super(SetCommandResponder, self).__init__(snmpEngine, snmpContext) super(SetCommandResponder, self).__init__(snmpEngine, snmpContext)
self.__context_name = v2c.OctetString(context_name)
self.__power_unit = power_unit self.__power_unit = power_unit
self.__logger = logging.getLogger(__name__) self.__logger = logging.getLogger(__name__)
@ -91,33 +135,52 @@ class SetCommandResponder(cmdrsp.SetCommandResponder):
def handleMgmtOperation(self, snmpEngine, stateReference, def handleMgmtOperation(self, snmpEngine, stateReference,
contextName, req_pdu, acInfo): contextName, req_pdu, acInfo):
var_binds = [] if self.__context_name == contextName:
for oid, val in v2c.apiPDU.getVarBinds(req_pdu): var_binds = []
if oid in self.__power_unit.oid_mapping:
try:
self.__power_unit.oid_mapping[oid].value = val
except Exception as ex: for oid, val in v2c.apiPDU.getVarBinds(req_pdu):
self.__logger.info( if oid in self.__power_unit.oid_mapping:
'Set value {} on power unit {} failed: {}'.format( try:
val, self.__power_unit.name, ex self.__power_unit.oid_mapping[oid].value = val
except Exception as ex:
self.__logger.info(
'Set value {} on power unit {} failed: {}'.format(
val, self.__power_unit.name, ex
)
) )
) val = v2c.NoSuchInstance('')
else:
val = v2c.NoSuchInstance('') val = v2c.NoSuchInstance('')
else:
val = v2c.NoSuchInstance('')
var_binds.append((oid, val)) var_binds.append((oid, val))
self.sendRsp(snmpEngine, stateReference, 0, 0, var_binds) self.sendRsp(snmpEngine, stateReference, 0, 0, var_binds)
self.releaseStateInformation(stateReference) self.releaseStateInformation(stateReference)
def create_snmp_engine(power_unit, listen_address, listen_port, def create_snmp_engine(power_unit,
community="public"): listen_address, listen_port,
snmp_engine = engine.SnmpEngine() **snmp_options):
snmp_versions = snmp_options.get('snmp_versions', [])
community = snmp_options.get('community')
engine_id = snmp_options.get('engine_id')
if engine_id:
engine_id = v2c.OctetString(hexValue=engine_id)
context_engine_id = snmp_options.get('context_engine_id')
if context_engine_id:
context_engine_id = v2c.OctetString(hexValue=context_engine_id)
context_name = snmp_options.get('context_name', '')
user = snmp_options.get('user')
auth_key = snmp_options.get('auth_key')
auth_protocol = auth_protocols[snmp_options.get('auth_protocol') or 'NONE']
priv_key = snmp_options.get('priv_key')
priv_protocol = priv_protocols[snmp_options.get('priv_protocol') or 'NONE']
snmp_engine = engine.SnmpEngine(snmpEngineID=engine_id)
config.addSocketTransport( config.addSocketTransport(
snmp_engine, snmp_engine,
@ -125,19 +188,57 @@ def create_snmp_engine(power_unit, listen_address, listen_port,
udp.UdpTransport().openServerMode((listen_address, listen_port)) udp.UdpTransport().openServerMode((listen_address, listen_port))
) )
config.addV1System(snmp_engine, community, community) # SNMPv1
if '1' in snmp_versions:
config.addV1System(snmp_engine, community, community)
# Allow read MIB access for this user / securityModels at SNMP VACM # Allow read MIB access for this user / securityModels at SNMP VACM
for snmp_version in (1, 2): config.addVacmUser(snmp_engine, 1,
config.addVacmUser(snmp_engine, snmp_version,
community, 'noAuthNoPriv', (1,), (1,)) community, 'noAuthNoPriv', (1,), (1,))
snmp_context = context.SnmpContext(snmp_engine) # SNMPv1
if '2c' in snmp_versions:
config.addV1System(snmp_engine, community, community)
# Allow read MIB access for this user / securityModels at SNMP VACM
config.addVacmUser(snmp_engine, 2,
community, 'noAuthNoPriv', (1,), (1,))
# SNMPv3/USM setup
if '3' in snmp_versions:
config.addV3User(
snmp_engine, user,
auth_protocol, auth_key,
priv_protocol, priv_key
)
if (auth_protocol != config.usmNoAuthProtocol and
priv_protocol != config.usmNoPrivProtocol):
sec_level = 'authPriv'
elif priv_protocol != config.usmNoAuthProtocol:
sec_level = 'authNoPriv'
else:
sec_level = 'noAuthNoPriv'
config.addVacmUser(snmp_engine, 3,
user, sec_level, (1,), (1,))
# SNMP context name is not actually used because we intercept
# MIB management calls by overriding `handleMgmtOperation()`
snmp_context = context.SnmpContext(snmp_engine,
contextEngineId=context_engine_id)
else:
snmp_context = context.SnmpContext(snmp_engine)
# Register SNMP Apps at the SNMP engine for particular SNMP context # Register SNMP Apps at the SNMP engine for particular SNMP context
GetCommandResponder(snmp_engine, snmp_context, power_unit=power_unit) GetCommandResponder(snmp_engine, snmp_context,
NextCommandResponder(snmp_engine, snmp_context, power_unit=power_unit) context_name=context_name, power_unit=power_unit)
SetCommandResponder(snmp_engine, snmp_context, power_unit=power_unit) NextCommandResponder(snmp_engine, snmp_context,
context_name=context_name, power_unit=power_unit)
SetCommandResponder(snmp_engine, snmp_context,
context_name=context_name, power_unit=power_unit)
return snmp_engine return snmp_engine
@ -145,17 +246,20 @@ def create_snmp_engine(power_unit, listen_address, listen_port,
class SNMPPDUHarness(threading.Thread): class SNMPPDUHarness(threading.Thread):
def __init__(self, power_unit, def __init__(self, power_unit,
listen_address, listen_port, listen_address, listen_port,
community="public", **snmp_options):
debug_snmp=False):
super(SNMPPDUHarness, self).__init__() super(SNMPPDUHarness, self).__init__()
self._logger = logging.getLogger(__name__) self._logger = logging.getLogger(__name__)
if debug_snmp: if snmp_options.get('debug_snmp'):
debug.setLogger(debug.Debug('all')) debug.setLogger(debug.Debug('all'))
self.snmp_engine = create_snmp_engine(power_unit, listen_address, self.snmp_engine = create_snmp_engine(
listen_port, community) power_unit,
listen_address, listen_port,
**snmp_options
)
self.listen_address = listen_address self.listen_address = listen_address
self.listen_port = listen_port self.listen_port = listen_port

View File

@ -34,7 +34,8 @@ class PDUTestCase(base.TestCase):
self.pdu, self.pdu,
'127.0.0.1', '127.0.0.1',
random.randint(20000, 30000), random.randint(20000, 30000),
self.community) snmp_versions=['1', '2c'],
community=self.community)
self.pdu_test_harness.start() self.pdu_test_harness.start()
def tearDown(self): def tearDown(self):
@ -45,7 +46,7 @@ class PDUTestCase(base.TestCase):
s = snmp_client.SnmpClient(cmdgen, s = snmp_client.SnmpClient(cmdgen,
self.pdu_test_harness.listen_address, self.pdu_test_harness.listen_address,
self.pdu_test_harness.listen_port, self.pdu_test_harness.listen_port,
community or self.community, community=community or self.community,
timeout=1, timeout=1,
retries=1) retries=1)
return s.get_one(oid) return s.get_one(oid)
@ -54,7 +55,7 @@ class PDUTestCase(base.TestCase):
s = snmp_client.SnmpClient(cmdgen, s = snmp_client.SnmpClient(cmdgen,
self.pdu_test_harness.listen_address, self.pdu_test_harness.listen_address,
self.pdu_test_harness.listen_port, self.pdu_test_harness.listen_port,
community or self.community, community=community or self.community,
timeout=1, timeout=1,
retries=1) retries=1)
return s.get_next(oid) return s.get_next(oid)
@ -63,7 +64,7 @@ class PDUTestCase(base.TestCase):
s = snmp_client.SnmpClient(cmdgen, s = snmp_client.SnmpClient(cmdgen,
self.pdu_test_harness.listen_address, self.pdu_test_harness.listen_address,
self.pdu_test_harness.listen_port, self.pdu_test_harness.listen_port,
community or self.community, community=community or self.community,
timeout=1, timeout=1,
retries=1) retries=1)

View File

@ -32,6 +32,7 @@ class TestSNMPPDUHarness(base.TestCase):
harness = pysnmp_handler.SNMPPDUHarness(power_unit=mock_power_unit, harness = pysnmp_handler.SNMPPDUHarness(power_unit=mock_power_unit,
listen_address='127.0.0.1', listen_address='127.0.0.1',
listen_port=port, listen_port=port,
snmp_versions=['1', '2c'],
community='bleh') community='bleh')
harness.start() harness.start()
@ -58,6 +59,7 @@ class TestSNMPPDUHarness(base.TestCase):
harness = pysnmp_handler.SNMPPDUHarness(power_unit=mock_power_unit, harness = pysnmp_handler.SNMPPDUHarness(power_unit=mock_power_unit,
listen_address='127.0.0.1', listen_address='127.0.0.1',
listen_port=port, listen_port=port,
snmp_versions=['1', '2c'],
community='bleh') community='bleh')
harness.start() harness.start()
@ -85,6 +87,7 @@ class TestSNMPPDUHarness(base.TestCase):
harness = pysnmp_handler.SNMPPDUHarness(power_unit=mock_power_unit, harness = pysnmp_handler.SNMPPDUHarness(power_unit=mock_power_unit,
listen_address='127.0.0.1', listen_address='127.0.0.1',
listen_port=port, listen_port=port,
snmp_versions=['1', '2c'],
community='bleh') community='bleh')
harness.start() harness.start()
@ -113,6 +116,7 @@ class TestSNMPPDUHarness(base.TestCase):
harness = pysnmp_handler.SNMPPDUHarness(power_unit=mock_power_unit, harness = pysnmp_handler.SNMPPDUHarness(power_unit=mock_power_unit,
listen_address='127.0.0.1', listen_address='127.0.0.1',
listen_port=port, listen_port=port,
snmp_versions=['1', '2c'],
community='bleh') community='bleh')
harness.start() harness.start()
@ -120,3 +124,108 @@ class TestSNMPPDUHarness(base.TestCase):
harness.join(timeout=5) harness.join(timeout=5)
self.assertFalse(harness.isAlive()) self.assertFalse(harness.isAlive())
class TestSNMPv3Operations(base.TestCase):
def setUp(self):
super(TestSNMPv3Operations, self).setUp()
self.mock_power_unit = mock.Mock()
self.mock_power_unit.oid_mapping = dict()
oid = (1, 3, 6, 99)
self.mock_power_unit.oid_mapping[oid] = mock.Mock()
self.mock_power_unit.oid_mapping[oid].value = univ.Integer(42)
def test_harness_none_none(self):
port = randint(20000, 30000)
harness = pysnmp_handler.SNMPPDUHarness(
power_unit=self.mock_power_unit,
listen_address='127.0.0.1',
listen_port=port,
snmp_versions=['3'],
user='openstack'
)
harness.start()
client = snmp_client.SnmpClient(
oneliner_cmdgen=cmdgen,
host='127.0.0.1',
port=port,
user='openstack',
timeout=1,
retries=1
)
self.assertEqual(42, client.get_one((1, 3, 6, 99)))
harness.stop()
def test_harness_md5_none(self):
port = randint(20000, 30000)
harness = pysnmp_handler.SNMPPDUHarness(
power_unit=self.mock_power_unit,
snmp_versions=['3'],
listen_address='127.0.0.1',
listen_port=port,
user='openstack',
auth_key='secretkey',
auth_protocol='MD5'
)
harness.start()
client = snmp_client.SnmpClient(
oneliner_cmdgen=cmdgen,
host='127.0.0.1',
port=port,
user='openstack',
auth_key='secretkey',
auth_protocol='MD5',
timeout=1,
retries=1
)
self.assertEqual(42, client.get_one((1, 3, 6, 99)))
harness.stop()
def test_harness_md5_des(self):
port = randint(20000, 30000)
harness = pysnmp_handler.SNMPPDUHarness(
power_unit=self.mock_power_unit,
listen_address='127.0.0.1',
listen_port=port,
snmp_versions=['3'],
user='openstack',
auth_key='secretkey',
auth_protocol='MD5',
priv_key='secretkey',
priv_protocol='DES'
)
harness.start()
client = snmp_client.SnmpClient(
oneliner_cmdgen=cmdgen,
host='127.0.0.1',
port=port,
user='openstack',
auth_key='secretkey',
auth_protocol='MD5',
priv_key='secretkey',
priv_protocol='DES',
timeout=1,
retries=1
)
self.assertEqual(42, client.get_one((1, 3, 6, 99)))
harness.stop()

View File

@ -47,16 +47,16 @@ class TestCoreIntegration(base.TestCase):
port = random.randint(20000, 30000) port = random.randint(20000, 30000)
community = 'public' community = 'public'
self.pdu_test_harness = pysnmp_handler.SNMPPDUHarness(pdu_, self.pdu_test_harness = pysnmp_handler.SNMPPDUHarness(
listen_address, pdu_, listen_address, port,
port, snmp_versions=['1', '2c'], community=community
community) )
self.pdu_test_harness.start() self.pdu_test_harness.start()
return snmp_client.SnmpClient(cmdgen, return snmp_client.SnmpClient(cmdgen,
listen_address, listen_address,
port, port,
community, community=community,
timeout=1, timeout=1,
retries=1) retries=1)

View File

@ -32,12 +32,14 @@ libvirt_uri=test:///default
[my_pdu] [my_pdu]
listen_address=127.0.0.1 listen_address=127.0.0.1
listen_port=9998 listen_port=9998
snmp_versions=1
community=public community=public
ports=5:test ports=5:test
[my_second_pdu] [my_second_pdu]
listen_address=127.0.0.1 listen_address=127.0.0.1
listen_port=9997 listen_port=9997
snmp_versions=1
community=public community=public
ports=2:test ports=2:test
""" """
@ -161,7 +163,7 @@ def _turn_off_outlet(community, listen_address, outlet, port):
snmp_client_ = snmp_client.SnmpClient(cmdgen, snmp_client_ = snmp_client.SnmpClient(cmdgen,
listen_address, listen_address,
port, port,
community, community=community,
timeout=1, timeout=1,
retries=1) retries=1)

View File

@ -12,30 +12,74 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from virtualpdu.pdu.pysnmp_handler import auth_protocols
from virtualpdu.pdu.pysnmp_handler import priv_protocols
from virtualpdu.tests import snmp_error_indications from virtualpdu.tests import snmp_error_indications
from pysnmp.proto.api import v2c
class SnmpClient(object): class SnmpClient(object):
def __init__(self, oneliner_cmdgen, host, port, community, timeout, def __init__(self, oneliner_cmdgen, host, port, **snmp_options):
retries):
self.host = host self.host = host
self.port = port self.port = port
self.community = community self.snmp_version = snmp_options.get('snmp_version')
self.timeout = timeout
self.retries = retries # SNMPv1/v2c options
self.community = snmp_options.get('community')
# SNMPv3 options
self.context_engine_id = snmp_options.get('context_engine_id')
if self.context_engine_id:
self.context_engine_id = v2c.OctetString(
hexValue=self.context_engine_id
)
self.context_name = snmp_options.get('context_name', '')
self.user = snmp_options.get('user')
self.auth_key = snmp_options.get('auth_key')
self.auth_protocol = auth_protocols[snmp_options.get('auth_protocol')
or 'NONE']
self.priv_key = snmp_options.get('priv_key')
self.priv_protocol = priv_protocols[snmp_options.get('priv_protocol')
or 'NONE']
self.timeout = snmp_options.get('timeout')
self.retries = snmp_options.get('retries')
if self.snmp_version is None:
if self.user is not None:
self.snmp_version = 3
else:
self.snmp_version = 0
cmdgen = oneliner_cmdgen cmdgen = oneliner_cmdgen
self.command_generator = cmdgen.CommandGenerator() self.command_generator = cmdgen.CommandGenerator()
self.community_data = cmdgen.CommunityData(self.community)
if self.snmp_version < 3:
self.auth_data = cmdgen.CommunityData(
self.community, mpModel=self.snmp_version
)
else:
self.auth_data = cmdgen.UsmUserData(
self.user,
self.auth_key, self.priv_key,
self.auth_protocol, self.priv_protocol
)
self.transport = cmdgen.UdpTransportTarget((self.host, self.port), self.transport = cmdgen.UdpTransportTarget((self.host, self.port),
timeout=self.timeout, timeout=self.timeout,
retries=self.retries) retries=self.retries)
def get_one(self, oid): def get_one(self, oid):
error_indication, error_status, error_index, var_binds = \ (error_indication,
self.command_generator.getCmd(self.community_data, error_status,
self.transport, error_index,
oid) var_binds) = self.command_generator.getCmd(
self.auth_data, self.transport, oid,
contextEngineId=self.context_engine_id,
contextName=self.context_name
)
self._handle_error_indication(error_indication) self._handle_error_indication(error_indication)
@ -43,10 +87,14 @@ class SnmpClient(object):
return val return val
def get_next(self, oid): def get_next(self, oid):
error_indication, error_status, error_index, var_binds = \ (error_indication,
self.command_generator.nextCmd(self.community_data, error_status,
self.transport, error_index,
oid) var_binds) = self.command_generator.nextCmd(
self.auth_data, self.transport, oid,
contextEngineId=self.context_engine_id,
contextName=self.context_name
)
self._handle_error_indication(error_indication) self._handle_error_indication(error_indication)
for varBindTableRow in var_binds: for varBindTableRow in var_binds:
@ -54,10 +102,14 @@ class SnmpClient(object):
return name, val return name, val
def set(self, oid, value): def set(self, oid, value):
error_indication, error_status, error_index, var_binds = \ (error_indication,
self.command_generator.setCmd(self.community_data, error_status,
self.transport, error_index,
(oid, value)) var_binds) = self.command_generator.setCmd(
self.auth_data, self.transport, (oid, value),
contextEngineId=self.context_engine_id,
contextName=self.context_name
)
self._handle_error_indication(error_indication) self._handle_error_indication(error_indication)

View File

@ -82,7 +82,8 @@ class SnmpServiceMessageReceivedTest(unittest.TestCase):
self.snmp_engine = create_snmp_engine(self.power_unit_mock, self.snmp_engine = create_snmp_engine(self.power_unit_mock,
'127.0.0.1', 161, '127.0.0.1', 161,
'community') snmp_versions=['1', '2c'],
community='community')
def tearDown(self): def tearDown(self):
self.snmp_engine.transportDispatcher.closeDispatcher() self.snmp_engine.transportDispatcher.closeDispatcher()

View File

@ -80,11 +80,11 @@ class TestSnmpClient(base.TestCase):
timeout=sentinel.timeout, timeout=sentinel.timeout,
retries=sentinel.retries) retries=sentinel.retries)
self.oneliner_cmdgen.CommunityData\ self.oneliner_cmdgen.CommunityData\
.assert_called_with(sentinel.community) .assert_called_with(sentinel.community, mpModel=0)
self.command_generator_mock.getCmd.assert_called_with( self.command_generator_mock.getCmd.assert_called_with(
sentinel.community_data, sentinel.community_data,
sentinel.udp_transport_target, sentinel.udp_transport_target,
oid oid, contextEngineId=None, contextName=''
) )
def test_get_with_all_possible_error_indications(self): def test_get_with_all_possible_error_indications(self):
@ -111,11 +111,12 @@ class TestSnmpClient(base.TestCase):
timeout=sentinel.timeout, timeout=sentinel.timeout,
retries=sentinel.retries) retries=sentinel.retries)
self.oneliner_cmdgen.CommunityData\ self.oneliner_cmdgen.CommunityData\
.assert_called_with(sentinel.community) .assert_called_with(sentinel.community, mpModel=0)
self.command_generator_mock.setCmd.assert_called_with( self.command_generator_mock.setCmd.assert_called_with(
sentinel.community_data, sentinel.community_data,
sentinel.udp_transport_target, sentinel.udp_transport_target,
(oid, '43 thousands') (oid, '43 thousands'),
contextEngineId=None, contextName=''
) )
def test_set_no_such_instance(self): def test_set_no_such_instance(self):
@ -132,12 +133,13 @@ class TestSnmpClient(base.TestCase):
retries=sentinel.retries) retries=sentinel.retries)
self.oneliner_cmdgen.CommunityData \ self.oneliner_cmdgen.CommunityData \
.assert_called_with(sentinel.community) .assert_called_with(sentinel.community, mpModel=0)
self.command_generator_mock.setCmd.assert_called_with( self.command_generator_mock.setCmd.assert_called_with(
sentinel.community_data, sentinel.community_data,
sentinel.udp_transport_target, sentinel.udp_transport_target,
(oid, '43 thousands') (oid, '43 thousands'),
contextEngineId=None, contextName=''
) )
def test_set_with_all_possible_error_indications(self): def test_set_with_all_possible_error_indications(self):