ldap3 官方文档学习之增删改查操作

1,773 阅读4分钟

前言

公司部门培训用到 ldap3,布置了个作业,于是开始看官方文档学习中。我是直接从 LDAP Operations 部分开始看的。

主要就是官方文档提供了增删改查的接口,需要看懂函数和参数,然后就会用了。

增加操作

官方 add 函数
def add(self,
        dn,
        object_class=None,
        attributes=None,
        controls=None)

逐个参数解释:

  • dn:标识要添加的目标名字

  • object_class:要添加的标志类名称,可以是包含一个单一值或一串字符串

  • attributes:一个以 {‘attr1’: ‘val1’, ‘attr2’: ‘val2’, …} or {‘attr1’: [‘val1’, ‘val2’, …], …} 多值形式的字典

  • controls:发送请求额外的信息

举例
# import class and constants
from ldap3 import Server, Connection, ALL

# define the server
s = Server('servername', get_info=ALL)  # define an unsecure LDAP server, requesting info on DSE and schema

# define the connection
c = Connection(s, user='user_dn', password='user_password')

# perform the Add operation
c.add('cn=user1,ou=users,o=company', ['inetOrgPerson', 'posixGroup', 'top'], {'sn': 'user_sn', 'gidNumber': 0})
# equivalent to 等同上面
c.add('cn=user1,ou=users,o=company', attributes={'objectClass':  ['inetOrgPerson', 'posixGroup', 'top'], 'sn': 'user_sn', gidNumber: 0})

print(c.result)

# close the connection
c.unbind()

主要就是 add 函数传三个参数:dn、object_class、attributes。

  • dn 包含用户cn、ou、o等信息

删除操作

官方 delete 函数
def delete(self,
           dn,
           controls=None):

逐个参数解释:

  • dn:标识要删除的目标名字
  • controls:发送请求额外的信息
举例
from ldap3 import Server, Connection, ALL

# define the server
s = Server('servername', get_info=ALL)  # define an unsecure LDAP server, requesting info on DSE and schema

# define the connection
c = Connection(s, user='user_dn', password='user_password')

# perform the Delete operation
c.delete('cn=user1,ou=users,o=company')
print(c.result)

# close the connection
c.unbind()

主要就是 delete 函数传一个参数:dn。

  • dn 包含用户cn、ou、o等信息

修改操作

官方 modify 函数
def modify(self,
           dn,
           changes,
           controls=None):

逐个参数解释:

  • dn:标识要删除的目标名字
  • changes:一个要被展示在具体入口的修改的字典
  • controls:发送请求额外的信息
举例
# import class and constants
from ldap3 import Server, Connection, ALL, MODIFY_REPLACE

# define the server
s = Server('servername', get_info=ALL)  # define an unsecure LDAP server, requesting info on DSE and schema

# define the connection
c = Connection(s, user='user_dn', password='user_password')
c.bind()

# perform the Modify operation
c.modify('cn=user1,ou=users,o=company',
         {'givenName': [(MODIFY_REPLACE, ['givenname-1-replaced'])],
          'sn': [(MODIFY_REPLACE, ['sn-replaced'])]})
print(c.result)

# close the connection
c.unbind()

主要就是 modify 函数传两个参数:dn、changes。

  • dn 包含用户cn、ou、o等信息
  • changes 包含修改前的数据类别和修改后的数据

查询操作

官方 search 函数
def search(self,
           search_base,
           search_filter,
           search_scope=SUBTREE,
           dereference_aliases=DEREF_ALWAYS,
           attributes=None,
           size_limit=0,
           time_limit=0,
           types_only=False,
           get_operational_attributes=False,
           controls=None,
           paged_size=None,
           paged_criticality=False,
           paged_cookie=None):

逐个参数解释:

  • search_base:查询请求的基础
  • search_filter:查询请求的过滤器,必须服从 LDAP 过滤语法 RFC4515 标准
  • search_scope:具体指定查询内容的部分
    • BASE:查询 search_base 中指定的条目的属性。
    • LEVEL:查询 search_base 中包含的条目的属性。基对象必须引用一个容器对象。
    • SUBTREE:向下查询 search_base 和所有附属容器中指定的条目的属性。
  • attributes:查询返回的单个属性或属性列表(默认为None)。如果属性为None,则不返回任何属性。如果属性是 ALL_ATTRIBUTES 或 ALL_OPERATIONAL_ATTRIBUTES,则返回所有用户属性或所有操作属性。
举例
from ldap3 import Server, Connection, SUBTREE
total_entries = 0
server = Server('test-server')
c = Connection(server, user='username', password='password')
c.search(search_base = 'o=test',
         search_filter = '(objectClass=inetOrgPerson)',
         search_scope = SUBTREE,
         attributes = ['cn', 'givenName'],
         paged_size = 5)
total_entries += len(c.response)
for entry in c.response:
    print(entry['dn'], entry['attributes'])
cookie = c.result['controls']['1.2.840.113556.1.4.319']['value']['cookie']
while cookie:
    c.search(search_base = 'o=test',
             search_filter = '(objectClass=inetOrgPerson)',
             search_scope = SUBTREE,
             attributes = ['cn', 'givenName'],
             paged_size = 5,
             paged_cookie = cookie)
    total_entries += len(c.response)
    cookie = c.result['controls']['1.2.840.113556.1.4.319']['value']['cookie']
    for entry in c.response:
        print(entry['dn'], entry['attributes'])
print('Total entries retrieved:', total_entries)

主要就是 search 函数传四个参数:search_base、search_filter、search_scope、attributes。

  • search_base:一般就是之前三个操作都要用到的 dn
  • search_filter:默认为 '(objectClass=inetOrgPerson)'
  • search_scope:我用的是 SUBTREE
  • attributes:我用的是 ALL_ATTRIBUTES

增删改查完整版代码

增删改查操作都是从 Excel 表格中读取数据的。

# coding: utf8
import sys
import json
from ldap3 import Connection, Server, ALL, MODIFY_ADD, MODIFY_REPLACE, SUBTREE, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
import utils
from config import BASE_DN, LDAP_TEST_CONFIG, LDAP_PROD_CONFIG, GROUP_DNS

host = "xxx.xxx.xxx.xxx"
port = xxx
user = "cn=xxx,dc=xx,dc=xxx"
password = "xxx"


# 创建连接
server = Server(host=host, port=port, get_info=ALL)
conn = Connection(server=server,
                  auto_bind=True,
                  read_only=False,
                  fast_decoder=True,
                  check_names=True,
                  user=user, password=password)


# 检查连接是否成功
def test_connection():
    print(server.info)
    print(conn.user)
    print(conn.extend.standard.who_am_i())


# 获取用户
def get_users():
    conn.search(search_base="dc=xxx,dc=xxx", attributes=ALL_ATTRIBUTES,
                search_filter='(objectclass=person)')
    print(conn.result)
    res = conn.response_to_json()
    res = json.loads(res)['entries']
    return res


print("===================测试链接====================")
test_connection()
print("\n\n\n===================打印用户====================")
print(get_users())


ldap_config = {}

excel_file_path = "ldap-users-example.xlsx"
ldap_user_excel_file = excel_file_path


# 批量添加组织人员
def add_users():
    # 读取用户excel信息
    userattrs = utils.generate_ldap_userattrs(ldap_user_excel_file)
    # print(userattrs)

    print("\n\n\n===================开始添加用户========================")
    for user_dn, userattr in userattrs.items():
        # 添加用户
        conn.add(user_dn, ['top', 'inetOrgPerson', 'posixAccount'], userattr)
        res = conn.result
        # 用户已存在
        if res['result'] != 0:
            msg = res['description']
            print("add user failed:%s      res: %s" % (user_dn, res))
            continue
        # 添加用户到用户组,直接添加到 cn = xx组中,没有这个操作的话就是不添加到 cn 中
        for GROUP_DN in GROUP_DNS:
            conn.modify(GROUP_DN, {'uniqueMember': [(MODIFY_ADD, user_dn)]})
            # print("====modify result=====")
            res = conn.result
            msg = "success"
            if res['result'] != 0:
                msg = res['message']
                print("add user to group:%s                  res: %s" % (GROUP_DN, msg))
                continue
        print(user_dn, "success")
    print("===================添加用户完毕========================")


# 批量删除组织人员
def delete_users():
    # 读取用户excel信息
    userattrs = utils.generate_ldap_userattrs(ldap_user_excel_file)
    # print(userattrs)

    print("\n\n\n===================开始删除用户========================")
    for user_dn, userattr in userattrs.items():
        # 删除用户
        conn.delete(user_dn)
        res = conn.result
        print(user_dn, "success")
    print("===================删除用户完毕========================")


# 修改人员信息
def modify_users():
    # 读取用户excel信息
    userattrs = utils.generate_ldap_userattrs(ldap_user_excel_file)
    # print(userattrs)

    print("\n\n\n===================开始修改用户信息========================")
    for user_dn, userattr in userattrs.items():
        # 修改用户部门和工作地点
        conn.modify(user_dn, {'physicalDeliveryOfficeName': [(MODIFY_REPLACE, ['武汉研发组'])],
          'l': [(MODIFY_REPLACE, ['武汉'])]})
        res = conn.result
        print(user_dn, "success")
    print("===================修改用户信息完毕========================")


# 查询人员信息
def search_users():
    # 读取用户excel信息
    userattrs = utils.generate_ldap_userattrs(ldap_user_excel_file)
    # print(userattrs)

    print("\n\n\n===================开始查询用户信息========================")
    for user_dn, userattr in userattrs.items():
        # 查询用户
        print(user_dn)
        status = conn.search(search_base = user_dn,
                             search_filter = '(objectClass=inetOrgPerson)',
                             search_scope = SUBTREE,
                             attributes = ALL_ATTRIBUTES)
        if status:
            print(user_dn, "success")
        else:
            print(user_dn, "failed")
    print("===================查询用户信息完毕========================")


test_connection()
get_users()
search_users()
add_users()
delete_users()
modify_users()
search_users()


# close the connection
conn.unbind()

参考

Ldap3 库使用方法-完整版修改连接

Ldap3 库使用方法(一)

python使用ldap3进行接口调用

对ldap实现增删改查--附demo