上上周接到需求,要将软件实现与客户AD域的信息进行对接。
需求如下:
1、在软件注册用户的页面新建一个功能,能够查询该用户名是否在AD域内存在。
2、用户登陆软件的时候对输入的账号密码与AD域内的账号密码进行校验,能够实时以AD域的账号密码登陆。
公司的考勤软件使用Django框架开发,用户登陆这一块用的是Django本身‘密码MD5加密’—‘保存数据库’—‘登陆时进行比对’的逻辑。
看到需求,直接否认了这种Django本身的逻辑,开玩笑,几千人的密码要进行实时更新,对服务器的压力也太大了,而且windows的官方文档写的清清楚楚:AD域不会提供密码。
Python对接AD域自然要用到ldap库,安装ldap库的时候最开始使用pip安装,后来软件报错才发现安装错误,应该安装python-ldap,而不是ldap。后来找到64位的安装包才安装成功。
(windows使用python有时候会有很多坑,因为有些库是依赖于VS,比较麻烦。)
ldap提供了接口可以供我们对AD域内的信息进行比对,只用修改Django的登陆逻辑,每次登陆的时候调用接口,和AD域内的信息进行比对就行了。
代码如下:
- # -*- coding: UTF-8 -*-
- import sys
- reload(sys)
- sys.setdefaultencoding('utf-8')
-
- import ldap,logging,time
- logfile = 'e:\\a.txt'
- # logging.basicConfig(filename=logfile,level=logging.INFO)
- # logging.basicConfig(format='%(time.asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
- logging.basicConfig(level=logging.INFO,
- #format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', #返回值:Thu, 26 May 2016 15:09:31 t11.py[line:92] INFO
- format='%(asctime)s %(levelname)s %(message)s',
- #datefmt='%a, %d %b %Y %H:%M:%S',
- #datefmt='%Y/%m/%d %I:%M:%S %p', #返回2016/05/26 03:12:56 PM
- datefmt='%Y-%m-%d %H:%M:%S', #返回2016/05/26 03:12:56 PM
- filename=logfile#,
- #filemode='a' #默认为a
- )
- #logging输出结果:
- #2016-05-26 15:22:29 INFO liu1 valid passed.
- #2016-05-26 15:22:37 INFO liu1 valid passed.
-
-
- class ldapc:
- def __init__(self,ldap_path,baseDN,ldap_authuser,ldap_authpass):
- self.baseDN = baseDN
- self.ldap_error = None
- self.l=ldap.initialize(ldap_path)
- self.l.protocol_version = ldap.VERSION3
- print 'url:',ldap_path
- print 'username:',ldap_authuser
- print 'password:',ldap_authpass
- try:
- res=self.l.simple_bind_s(ldap_authuser,ldap_authpass)
- print res
- except ldap.LDAPError,err:
- self.ldap_error = 'Connect to %s failed, Error:%s.' %(ldap_path,err.message['desc'])
- print self.ldap_error
- # finally:
- # self.l.unbind_s()
- # del self.l
-
- def search_users(self,username): #模糊查找,返回一个list,使用search_s()
- if self.ldap_error is None:
- try:
- searchScope = ldap.SCOPE_SUBTREE
- searchFiltername = "sAMAccountName" #通过samaccountname查找用户
- retrieveAttributes = None
- searchFilter = '(' + searchFiltername + "=" + username +'*)'
- ldap_result =self.l.search_s(self.baseDN, searchScope, searchFilter, retrieveAttributes)
- if len(ldap_result) == 0: #ldap_result is a list.
- return "%s doesn't exist." %username
- else:
- # result_type, result_data = self.l.result(ldap_result, 0)
- # return result_type, ldap_result
- return ldap_result
- except ldap.LDAPError,err:
- return err
-
- def search_user(self,username): #精确查找,返回值为list,使用search()
- if self.ldap_error is None:
- try:
- searchScope = ldap.SCOPE_SUBTREE
- searchFiltername = "sAMAccountName" #通过samaccountname查找用户
- retrieveAttributes = None
- searchFilter = '(' + searchFiltername + "=" + username +')'
- ldap_result_id =self.l.search(self.baseDN, searchScope, searchFilter, retrieveAttributes)
- result_type, result_data = self.l.result(ldap_result_id, 0)
- if result_type == ldap.RES_SEARCH_ENTRY:
- return result_data
- else:
- return "%s doesn't exist." %username
- except ldap.LDAPError,err:
- return err
-
- def search_userDN(self,username): #精确查找,最后返回该用户的DN值
- if self.ldap_error is None:
- try:
- searchScope = ldap.SCOPE_SUBTREE
- searchFiltername = "sAMAccountName" #通过samaccountname查找用户
- retrieveAttributes = None
- searchFilter = '(' + searchFiltername + "=" + username +')'
- ldap_result_id =self.l.search(self.baseDN, searchScope, searchFilter, retrieveAttributes)
- result_type, result_data = self.l.result(ldap_result_id, 0)
- if result_type == ldap.RES_SEARCH_ENTRY:
- print 'flag:',1
- return result_data[0][0] #list第一个值为用户的DN,第二个值是一个dict,包含了用户属性信息
- else:
- print 'flag:',0
- return "%s doesn't exist." %username
- except ldap.LDAPError,err:
- return err
-
- def valid_user(self,username,userpassword): #验证用户密码是否正确
- if self.ldap_error is None:
- target_user = self.search_userDN(username) #使用前面定义的search_userDN函数获取用户的DN
- if target_user.find("doesn't exist") == -1:
- try:
- self.l.simple_bind_s(target_user,userpassword)
- logging.info('%s valid passed.\r'%(username)) #logging会自动在每行log后面添加"\000"换行,windows下未自动换行
- return True
- except ldap.LDAPError,err:
- return err
- else:
- return target_user
-
- def update_pass(self,username,oldpassword,newpassword): #####未测试#########
- if self.ldap_error is None:
- target_user = self.search_userDN(username)
- if target_user.find("doesn't exist") == -1:
- try:
- self.l.simple_bind_s(target_user,oldpassword)
- self.l.passwd_s(target_user,oldpassword,newpassword)
- return 'Change password success.'
- except ldap.LDAPError,err:
- return err
- else:
- return target_user
-
-
-
- ldap_authuser='xxx@xxxx.com'
- ldap_authpass='xxx'
- domainname='xxx'
- ldappath='ldap://xxx.xxx.xxx.xxx:xxx'
-
- baseDN='DC=xxx,DC=com' #ldap_authuser在连接到LDAP的时候不会用到baseDN,在验证其他用户的时候才需要使用
- username = 'xxx' #要查找/验证的用户
- p=ldapc(ldappath,baseDN,ldap_authuser,ldap_authpass)
- # print 'list--search:',p.search_users(username)
- print 'DN----search',p.search_userDN(username)
- print 'DN-ty',type(p.search_userDN(username))
- print 'user--valid',p.valid_user(ldap_authuser,ldap_authpass) #调用valid_user()方法验证用户是否为合法用户
在这里只要配置好AD域的域名、URL地址、管理员权限的账号密码,就直接调用接口了。
PS:如果调用模糊查找里面的ldap.search_s()方法,一直在报错。但是用ldap.search()方法就没有问题。