import ipaddress from ansible.errors import AnsibleError, AnsibleParserError from ansible.plugins.lookup import LookupBase from ansible.utils.display import Display import ldap display = Display() def decode_object(object): return {attribute: [value.decode('utf-8') for value in object[attribute]] for attribute in object} class LookupModule(LookupBase): def __init__(self, **kwargs): self.base = ldap.initialize('ldaps://localhost:1636/') self.base.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW) self.base.set_option(ldap.OPT_X_TLS_NEWCTX, 0) self.base_dn = 'dc=crans,dc=org' def query(self, base, scope, filter='(objectClass=*)', attr=None): """ Make a LDAP query query('ldap', 'query', BASE, SCOPE[, FILTER[, ATTR]]) BASE: base dn SCOPE: 'base', 'one' or 'sub' FILTER: ldap filter (optional) ATTR: list of attributes (optional) """ scope = { 'base': ldap.SCOPE_BASE, 'one': ldap.SCOPE_ONELEVEL, 'sub': ldap.SCOPE_SUBTREE }[scope] query_id = self.base.search(f"{base}", scope, filter, attr) result = self.base.result(query_id)[1] result = { dn: decode_object(entry) for dn, entry in result } return result def ip(self, host, vlan): """ Retrieve IP addresses of an interface of a device query('ldap', 'ip', HOST, VLAN) """ if isinstance(vlan, int): network_query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={vlan}") network_result = self.base.result(network_query_id) vlan = network_result[1][0][1]['cn'][0].decode('utf-8') if vlan == 'srv': query_id = self.base.search(f"cn={host}.crans.org,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) else: query_id = self.base.search(f"cn={host}.{vlan}.crans.org,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) result = self.base.result(query_id) result = result[1][0][1] result = [res.decode('utf-8') for res in result['ipHostNumber']] return result def run(self, terms, variables=None, **kwargs): if terms[0] == 'query': result = self.query(*terms[1:]) elif terms[0] == 'ip': result = self.ip(*terms[1:]) elif terms[0] == 'group': query_id = self.base.search(f"ou=group,{self.base_dn}", ldap.SCOPE_SUBTREE, "objectClass=posixGroup") result = self.base.result(query_id) result = result[1] # query interface attribute # query('ldap', 'hosts', HOST, VLAN, ATTR) # HOST: device name # VLAN: vlan name # ATTR: attribute elif terms[0] == 'hosts': host = terms[1] vlan = terms[2] attr = terms[3] if isinstance(vlan, int): network_query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={vlan}") network_result = self.base.result(network_query_id) vlan = network_result[1][0][1]['cn'][0].decode('utf-8') if vlan == 'srv': query_id = self.base.search(f"cn={host}.crans.org,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) else: query_id = self.base.search(f"cn={host}.{vlan}.crans.org,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) result = self.base.result(query_id) result = result[1][0][1] result = [res.decode('utf-8') for res in result[attr]] elif terms[0] == 'networks': network = terms[1] query_id = self.base.search(f"cn={network},ou=networks,{self.base_dn}", ldap.SCOPE_BASE, "objectClass=ipNetwork") result = self.base.result(query_id) result = result[1][0][1] return [str(ipaddress.ip_network('{}/{}'.format(result['ipNetworkNumber'][0].decode('utf-8'), result['ipNetmaskNumber'][0].decode('utf-8'))))] elif terms[0] == 'vlanid': network = terms[1] query_id = self.base.search(f"cn={network},ou=networks,{self.base_dn}", ldap.SCOPE_BASE, "objectClass=ipNetwork") result = self.base.result(query_id) result = result[1][0][1] return int(result['description'][0]) elif terms[0] == 'role': role = terms[1] query_id = self.base.search(f"ou=hosts,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={role}") result = self.base.result(query_id) result = [cn.decode('utf-8') for res in result[1] for cn in res[1]['cn']] return result