17
17
import ldap
18
18
from ldap .controls import DecodeControlTuples , RequestControl
19
19
from ldap .extop import ExtendedRequest
20
+ from ldap .extop .passwd import PasswordModifyResponse
20
21
from ldap .ldapobject import SimpleLDAPObject , NO_UNIQUE_ENTRY
21
22
from ldap .response import (
22
23
Response ,
23
- SearchEntry , SearchReference , SearchResult ,
24
+ SearchEntry , SearchReference ,
24
25
IntermediateResponse , ExtendedResult ,
25
26
)
26
27
@@ -32,10 +33,15 @@ class Connection(SimpleLDAPObject):
32
33
resp_ctrl_classes = None
33
34
34
35
def result (self , msgid : int = ldap .RES_ANY , * , all : int = 1 ,
35
- timeout : Optional [float ] = None ) -> Optional [list [Response ]]:
36
+ timeout : Optional [float ] = None ,
37
+ defaultIntermediateClass :
38
+ Optional [type [IntermediateResponse ]] = None ,
39
+ defaultExtendedClass : Optional [type [ExtendedResult ]] = None
40
+ ) -> Optional [list [Response ]]:
36
41
"""
37
- result([msgid: int = RES_ANY [, all: int = 1 [, timeout :
38
- Optional[float] = None]]]) -> Optional[list[Response]]
42
+ result([msgid: int = RES_ANY [, all: int = 1 [,
43
+ timeout: Optional[float] = None]]])
44
+ -> Optional[list[Response]]
39
45
40
46
This method is used to wait for and return the result of an
41
47
operation previously initiated by one of the LDAP asynchronous
@@ -87,13 +93,26 @@ def result(self, msgid: int = ldap.RES_ANY, *, all: int = 1,
87
93
88
94
results = []
89
95
for msgid , msgtype , controls , data in messages :
90
- controls = DecodeControlTuples (controls , self .resp_ctrl_classes )
96
+ if controls is not None :
97
+ controls = DecodeControlTuples (controls , self .resp_ctrl_classes )
91
98
99
+ if msgtype == ldap .RES_INTERMEDIATE :
100
+ data ['defaultClass' ] = defaultIntermediateClass
101
+ if msgtype == ldap .RES_EXTENDED :
102
+ data ['defaultClass' ] = defaultExtendedClass
92
103
m = Response (msgid , msgtype , controls , ** data )
93
104
results .append (m )
94
105
95
106
return results
96
107
108
+ def add_s (self , dn : str ,
109
+ modlist : list [tuple [str , Union [bytes , list [bytes ]]]], * ,
110
+ ctrls : RequestControls = None ) -> ldap .response .AddResult :
111
+ msgid = self .add_ext (dn , modlist , serverctrls = ctrls )
112
+ responses = self .result (msgid )
113
+ result , = responses
114
+ return result
115
+
97
116
def bind_s (self , dn : Optional [str ] = None ,
98
117
cred : Union [None , str , bytes ] = None , * ,
99
118
method : int = ldap .AUTH_SIMPLE ,
@@ -119,17 +138,36 @@ def delete_s(self, dn: str, *,
119
138
result , = responses
120
139
return result
121
140
122
- def extop_s (self , oid : Optional [str ] = None ,
141
+ def extop_s (self , name : Optional [str ] = None ,
123
142
value : Optional [bytes ] = None , * ,
124
143
request : Optional [ExtendedRequest ] = None ,
125
- ctrls : RequestControls = None
144
+ ctrls : RequestControls = None ,
145
+ defaultIntermediateClass : Optional [type [IntermediateResponse ]] = None ,
146
+ defaultExtendedClass : Optional [type [ExtendedResult ]] = None
126
147
) -> list [Union [IntermediateResponse , ExtendedResult ]]:
127
148
if request is not None :
128
- oid = request .requestName
149
+ name = request .requestName
129
150
value = request .encodedRequestValue ()
130
151
131
- msgid = self .extop (oid , value , serverctrls = ctrls )
132
- return self .result (msgid )
152
+ msgid = self .extop (name , value , serverctrls = ctrls )
153
+ return self .result (msgid ,
154
+ defaultIntermediateClass = defaultIntermediateClass ,
155
+ defaultExtendedClass = defaultExtendedClass )
156
+
157
+ def modify_s (self , dn : str ,
158
+ modlist : list [tuple [str , Union [bytes , list [bytes ]]]], * ,
159
+ ctrls : RequestControls = None ) -> ldap .response .ModifyResult :
160
+ msgid = self .modify_ext (dn , modlist , serverctrls = ctrls )
161
+ responses = self .result (msgid )
162
+ result , = responses
163
+ return result
164
+
165
+ def passwd_s (self , user : Optional [str ] = None ,
166
+ oldpw : Optional [bytes ] = None , newpw : Optional [bytes ] = None ,
167
+ ctrls : RequestControls = None ) -> PasswordModifyResponse :
168
+ msgid = self .passwd (user , oldpw , newpw , serverctrls = ctrls )
169
+ res , = self .result (msgid , defaultExtendedClass = PasswordModifyResponse )
170
+ return res
133
171
134
172
def search_s (self , base : Optional [str ] = None ,
135
173
scope : int = ldap .SCOPE_SUBTREE ,
@@ -147,8 +185,11 @@ def search_s(self, base: Optional[str] = None,
147
185
attrsonly = attrsonly , serverctrls = ctrls ,
148
186
sizelimit = sizelimit , timeout = timelimit )
149
187
result = self .result (msgid , timeout = timeout )
188
+ # FIXME: we want a better way of returning a result with multiple
189
+ # messages, always useful in searches but other operations can also
190
+ # elicit those (by way of an IntermediateResponse)
150
191
result [- 1 ].raise_for_result ()
151
- return result [: - 1 ]
192
+ return result
152
193
153
194
def search_subschemasubentry_s (
154
195
self , dn : Optional [str ] = None ) -> Optional [str ]:
@@ -212,6 +253,6 @@ def find_unique_entry(self, base: Optional[str] = None,
212
253
r = self .search_s (base , scope , filter , attrlist = attrlist ,
213
254
attrsonly = attrsonly , ctrls = ctrls , timeout = timeout ,
214
255
sizelimit = 2 )
215
- if len (r ) != 1 :
256
+ if len (r ) != 2 :
216
257
raise NO_UNIQUE_ENTRY (f'No or non-unique search result for { filter } ' )
217
258
return r [0 ]
0 commit comments