This DBI implements the Cursor and Connection objects. It is functional: you can create connections, cursors, do fetchone, fetchall, get rowcount, etc. It uses osql or sqlcmd instead of ODBC or ADO. There is a good sized section with examples to get you started.

Python, 239 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#dblib.py                                                                                                                                          
#created by Jorge Besada
#Last updated: 5/24/2010 - suggestion by Kosta Welke implemented
                                                                                                                                                   
import os,sys                                                                                                                                      
                                                                                                                                                   
class Connection:                                                                                                                                  
    def __init__(self,sname,uname='',password='',db='', version=''):                                                                               
        self.version = version                                                                                                                     
        self.servername = sname                                                                                                                    
        self.username = uname                                                                                                                      
        self.password = password                                                                                                                   
        self.defdb = db                                                                                                                            
        self.constr = ''                                                                                                                           
        if db == '':                                                                                                                               
            self.defdb = 'master'                                                                                                                  
        self.connected = 0                                                                                                                         
        if self.version == None or self.version == "":                                                                                             
            print "Need to pass sql version argument"                                                                                              
            return self                                                                                                                            
        if self.version == "sql2000" or self.version == "sql7":                                                                                    
            execsql = "osql"                                                                                                                       
        if self.version == "sql2005" or self.version == "sql2008":
            execsql = "sqlcmd"                                                                                                                     
        if self.version == "sybase":                                                                                                               
            execsql = "isql"                                                                                                                       
            print "Sorry, Sybase has not been implemented yet!"                                                                                    
            return self                                                                                                                            
        if uname == '':                                                                                                                            
            self.constr = execsql + " -E -S" + self.servername + " -d" + self.defdb + " /w 8192 "                                                  
        else:                                                                                                                                      
            self.constr = execsql + " -U" + self.username + " -P" + self.password + " -S" + self.servername + " -d" + self.defdb + " /w 8192 "     
                                                                                                                                                   
        #test connection:                                                                                                                          
        s = "set nocount on select name from master..syslogins where name = 'sa'"                                                                  
        lst = os.popen(self.constr + ' -Q' + '"' + s + '"').readlines()                                                                            
                                                                                                                                                   
        try:                                                                                                                                       
            if lst[2].strip() == 'sa':                                                                                                             
                self.connected = 1                                                                                                                 
            else:                                                                                                                                  
                self.connected = 0                                                                                                                 
            c = Cursor()                                                                                                                           
            c.servername = sname                                                                                                                   
            c.username = uname                                                                                                                     
            c.password = password                                                                                                                  
            c.defdb = db                                                                                                                           
            c.constr = self.constr                                                                                                                 
            self.cursor = c                                                                                                                        
        except IndexError:                                                                                                                         
            print "Could not connect"                                                                                                              
                                                                                                                                                   
    def commit(self):                                                                                                                              
        "this is here for compatibility"                                                                                                           
        pass                                                                                                                                       
                                                                                                                                                   
    def close(self):                                                                                                                               
        self = None                                                                                                                                
        return self                                                                                                                                
                                                                                                                                                   
                                                                                                                                                   
class Cursor:                                                                                                                                      
    def __init__(self):                                                                                                                            
        self.defdb = ''                                                                                                                            
        self.servername = ''                                                                                                                       
        self.username = ''                                                                                                                         
        self.password = ''                                                                                                                         
        self.constr = ''                                                                                                                           
        self.rowcount = -1                                                                                                                         
        self.records = []                                                                                                                          
        self.rowid = 0                                                                                                                             
        self.sqlfile = "-Q"                                                                                                                        
        self.colseparator = chr(1) #default column separator                                                                                       
        #this is going to be a list of lists, each one with:                                                                                       
        #name, type_code, display_size, internal_size, precision, scale, null_ok                                                                   
        self.description = []                                                                                                                      
        self.fieldnames = []                                                                                                                       
        self.fieldvalues = []                                                                                                                      
        self.fieldvalue = []                                                                                                                       
        #one dictionary by column                                                                                                                  
        self.dictfield = {'name':'', 'type_code':0,'display_size':0,'internal_size':0,'precision':0, 'scale':0, 'null_ok':0}                       
        #list of lists                                                                                                                             
        self.dictfields = []                                                                                                                       
                                                                                                                                                   
    #this is for compatibility to allow both types of calls:                                                                                       
    #cursor = connection.cursor() or using cursor = connection.cursor                                                                              
    def __call__(self):                                                                                                                            
        c = Cursor()                                                                                                                               
        return c                                                                                                                                   
                                                                                                                                                   
    def execute(self, s):                                                                                                                          
        self.records = []                                                                                                                          
        lst = os.popen(self.constr + ' -s' + self.colseparator + " " + self.sqlfile + '"' + s + '"').readlines()                                   
        if len(lst) == 0:                                                                                                                          
            return self.rowcount                                                                                                                   
                                                                                                                                                   
        #If we get here we have results                                                                                                            
        #rowcount maybe in last line, in this form: (4 rows affected)                                                                              
        tmplastline = lst[-1]                                                                                                                      
        if tmplastline[0] == "(":  #there is a rowcount                                                                                            
            lastline = lst[-1]                                                                                                                     
            spacepos = lastline.index(" ")                                                                                                         
            cnt = lastline[1:spacepos]                                                                                                           
            self.rowcount = int(cnt)                                                                                                             
        else:                                                                                                                                      
            #last line has no recordcount, so reset it to 0                                                                                        
            self.records = lst[:]                                                                                                                  
            self.rowcount = 0                                                                                                                      
            return self.rowcount                                                                                                                   
                                                                                                                                                   
        #if we got here we may have a rowcount and the list with results                                                                           
        i = 0                                                                                                                                      
        #process metadata if we have it:                                                                                                           
        firstline = lst[0]                                                                                                                         
        lst1 = lst[0].split(self.colseparator)                                                                                                     
        self.fieldnames = []                                                                                                                       
        for x in lst1:                                                                                                                             
            x1 = x.strip()                                                                                                                         
            self.fieldnames.append(x1)  #add column name                                                                                           
        #need to make a list for each column name                                                                                                  
        self.description = []                                                                                                                      
        for x in self.fieldnames:                                                                                                                  
            l = []                                                                                                                                 
            l.append(x)                                                                                                                            
            for m in range(len(self.dictfield) - 1):                                                                                               
                l.append(0)                                                                                                                        
            l2 = tuple(l)                                                                                                                          
            self.description.append(l2)                                                                                                            
        self.description = tuple(self.description)                                                                                                 
                                                                                                                                                   
        #Data section: lst[0] is row with column names,skip                                                                                        
        #If the resulting string starts and ends with '-', discard                                                                                 
                                                                                                                                                   
        for x in lst[1:-1]:                                                                                                                        
            x0 = ''.join(x)                                                                                                                        
            x1 = x0.strip()                                                                                                                        
            #if x1 > '' and x1[0] > '-' and x1[-1] > '-':
            if x1 <> '' and x1[0] <> '-' and x1[-1] <> '-':
                self.records.append(x1)                                                                                                            
        #reset for each execution                                                                                                                  
        self.rowid = 0                                                                                                                             
        return self.rowcount                                                                                                                       
                                                                                                                                                   
    #returns one row of the result set, keeps track of the position                                                                                
    def fetchone(self):                                                                                                                            
        i = self.rowid                                                                                                                             
        j = i + 1                                                                                                                                  
        self.rowid = j                                                                                                                             
        try:                                                                                                                                       
            return tuple(self.records[i].split(self.colseparator))                                                                                 
        except IndexError:                                                                                                                         
            pass                                                                                                                                   
                                                                                                                                                   
    #returns whole recordset                                                                                                                       
    def fetchall(self):                                                                                                                            
        lst = []                                                                                                                                   
        try:                                                                                                                                       
            for x in range(self.rowid, self.rowcount):                                                                                             
                x1 = tuple(self.records[x].split(self.colseparator))                                                                               
                lst.append(x1)                                                                                                                     
        except IndexError:                                                                                                                         
            pass                                                                                                                                   
        return lst                                                                                                                                 
                                                                                                                                                   
    def close(self):                                                                                                                               
        self.records = None                                                                                                                        
        self = None                                                                                                                                
        return self                                                                                                                                
                                                                                                                                                   
#-----------------------------------------                                                                                                         
                                                                                                                                                   
#Testing harness: we create and drop logins and databases                                                                                          
#Edit connection for desired server name and security options:                                                                                     
#Sample: for local server default instance SQL2000, integrated security                                                                                                             
#   c = Connection('(local)',db='pubs', version='sql2000')                                                                                         
#For local server, SQL security                                                                                                                    
#   c = Connection('(local)','sa','sa password',db='pubs', version='sql2000')                                                                      
#These tests use a restored pubs database                                                                                          
#in a SQL2008 instance (local)\sql2008                                                                     

              
if __name__ == '__main__':                                                                                                                         
    c = Connection('(local)\sql2008',db='pubs', version='sql2008')                                                                                    
    print "Connection string: " + c.constr                                                                                                         
    if c.connected == 1:                                                                                                                           
        print "Connected OK"                                                                                                                       
    cu = c.cursor                                                                                                                                  
    lst = cu.execute('select * from authors')                                                                                                      
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    rows = cu.fetchall()                                                                                                                           
    for x in rows:                                                                                                                                 
        print x                                                                                                                                    
    c.close()                                                                                                                                      
                                                                                                                                                   
    #Several SQL statements test                                                                                                                   
    lst = cu.execute("sp_addlogin 'test2', 'test2'")                                                                                               
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    lst = cu.execute("select name from master..syslogins where name = 'test2'")                                                                    
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    rows = cu.fetchall()                                                                                                                           
    for x in rows:                                                                                                                                 
        print x                                                                                                                                    
    c.close()                                                                                                                                      
                                                                                                                                                   
    lst = cu.execute("EXEC sp_droplogin 'test2'")                                                                                                  
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    lst = cu.execute("select name from master..syslogins where name = 'test2'")                                                                    
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    rows = cu.fetchall()                                                                                                                           
    for x in rows:                                                                                                                                 
        print x                                                                                                                                    
    c.close()                                                                                                                                      
                                                                                                                                                   
    lst = cu.execute("CREATE DATABASE test")                                                                                                       
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    lst = cu.execute("select name from master..sysdatabases where name = 'test'")                                                                  
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    rows = cu.fetchall()                                                                                                                           
    for x in rows:                                                                                                                                 
        print x                                                                                                                                    
    c.close()                                                                                                                                      
                                                                                                                                                   
    lst = cu.execute("DROP DATABASE test")                                                                                                         
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    lst = cu.execute("select name from master..sysdatabases where name = 'test'")                                                                  
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    rows = cu.fetchall()                                                                                                                           
    for x in rows:                                                                                                                                 
        print x                                                                                                                                    
    c.close()

    lst = cu.execute("update authors set au_lname = 'Whitty' where au_id = '172-32-1176'")                                                                                                         
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    lst = cu.execute("select au_lname from authors  where au_id = '172-32-1176'")                                                                  
    print 'rowcount=' + str(cu.rowcount)                                                                                                           
    rows = cu.fetchall()                                                                                                                           
    for x in rows:                                                                                                                                 
        print x                                                                                                                                    
    c.close()      
                                                                                                                                                   

This is a library I created to manage SQL server databases. It works with SQL2005 and SQL2008's sqlcmd, in addition to the SQL2000 and SQL7's osql.exe. The previous revision included the option for integrated security, and the test section on the library itself; I also cleaned some old code and comments and removed the references to the string module. Later I will add support for Sybase (or maybe someone else will do it!). If you are a system engineer or database administrator and find yourself doing a lot of scripts and batch files doing calls to osql.exe you will find this library useful.

25 comments

AGIER Bertrand 20 years, 11 months ago  # | flag

CallProc. I can't execute procedures like EXEC sp_addlogin ... Is there a special way to do that ?

Thanks You

khawaja butt 20 years, 1 month ago  # | flag

what's dbcp, Hi, when i run your script i get erros, dbcp is not defined. python can't find this module. where can i get it. please reply. thanks

Jorge Besada (author) 19 years, 11 months ago  # | flag

dbcp and Pretty Printer. Hi Khawaja

dbcp is the module from another colaborator (Steve Holden). Do a search on the Cookbook for "Pretty Printer" and you will find it. Save it as a module named dbcp and your dblib will work ok. I will be updating dblib soon, thanks for trying it

Jorge Besada (author) 19 years, 11 months ago  # | flag

EXEC ok with sp_addllogin. Hi Bertrand

Thanks for testing dblib!

Sorry for the late reply

I tested using the sp_addlogin with EXEC (and without it)

and it worked fine.

Just make sure you configure the sql statement with a combination of double quotes and single quotes:

lst = cu.execute("EXEC sp_addlogin 'test3', 'test3'")

And it should work ok.

I will be working more on this from now on.

Best regards

Jorge

Updated test program follows

dblib_test.py

test program to test dblib.py

from dblib import *

from dbcp import pp #Pretty Printer imported here

c = Connection('SERVERNAME', 'sa', 'password','pubs')

print c.constr #print the connection string

print c.connected #prints 1 if connected OK

cu = c.cursor #create the cursor

lst = cu.execute('select * from authors')

print 'rowcount=' + str(cu.rowcount) #test print of record count

rows = cu.fetchall()

print pp(cu, rows, rowlens=1)

new test using sp_addlogin, no EXEC

lst = cu.execute("sp_addlogin 'test2', 'test2'")

print 'rowcount=' + str(cu.rowcount) #test print of record count

rows = cu.fetchall()

print pp(cu, rows, rowlens=1)

c.close()

new test using EXEC

lst = cu.execute("EXEC sp_addlogin 'test3', 'test3'")

print 'rowcount=' + str(cu.rowcount) #test print of record count

rows = cu.fetchall()

print pp(cu, rows, rowlens=1)

checking the logins were created:

lst = cu.execute("select name from master..syslogins")

print 'rowcount=' + str(cu.rowcount) #test print of record count

rows = cu.fetchall()

print pp(cu, rows, rowlens=1)

c.close()

-----------------------------------------------------

sasa sasa 18 years, 4 months ago  # | flag

Not getting all fields. Hi there:

I'm not getting all of the fields from my query:

sql="""select contact1.,contact2. from contact1 left outer join contact2 on contact1.accountno=contact2.accountno"""

c = Connection(...)

cu = c.cursor #create the cursor

lst = cu.execute(sql)

print cu.fieldnames

I'm getting 31 field names but the two combined tables should have 194.

thanks

Greg

Jorge Besada (author) 18 years, 4 months ago  # | flag

Please try this. Hi Greg - thanks for using dblib!

I made a copy of the authors table (named it authors2) from SQL server pubs database and run this version of your query

sql="select authors.,authors2. from authors left outer join authors2 on authors.au_id=authors2.au_id"

c = Connection('(local)',db='pubs')

cu = c.cursor #create the cursor

lst = cu.execute(sql)

print cu.fieldnames

And got the complete set of columns

['au_id', 'au_lname', 'au_fname', 'phone', 'address', 'city', 'state', 'zip', 'contract', 'au_id', 'au_lname', 'au_fname', 'phone', 'address', 'city', 'state', 'zip', 'contract']

Regards - Jorge

Brian Ironside 18 years, 2 months ago