配置信息
#dbconfig.py#ORACLE地址ORACLE_HOST = ip #测试地址ORACLE_SID = "orcl" #测试 实例ORACLE_USER = "" #测试用户ORACLE_PASSWORD = "" #测试密码#oracle端口ORACLE_PORT = 1521NLS_LANG = 'SIMPLIFIED CHINESE_CHINA.UTF8' #编码# NLS_LANG = 'SIMPLIFIED CHINESE_CHINA.ZHS16GBK'#编码2,好像都可以,自由切换ORACLE_PATH = 'C:/workFiles/instantclient_18_3' #windows配置信息路径ORACLE_ECODING = 'utf8'
现在的是非线程安全的
#OracleHelper.py
import cx_Oracle as oraclefrom tutorial.poHelper import dbconfig#import os# 增加环境变量,配合读取配置信息访问oracle#os.environ['NLS_LANG'] = dbconfig.NLS_LANG#os.environ['path'] = dbconfig.ORACLE_PATH #这个路径看况填写class OracleHelper(): def __init__(self): self.dsn_tns = oracle.makedsn(dbconfig.ORACLE_HOST, dbconfig.ORACLE_PORT, dbconfig.ORACLE_SID) def getConnect(self): # 建立连接 self.connect = oracle.connect(user=dbconfig.ORACLE_USER, password=dbconfig.ORACLE_PASSWORD, dsn=self.dsn_tns,threaded=True,events = True) return self.connect_oracleManager = OracleHelper()#获取连接def getConn(): """ 获取数据库连接 """ return _oracleManager.getConnect()#查询所有def fetchall(sql,param=[]): return _execute_query_by_sql_param(sql,param=param)#查询一条记录def fetchone(sql,param=[]): sql = get_complete_sql(sql, param) sql = "select * from ( %s ) where rownum = 1" % sql result = _execute_query_by_sql_param(sql=sql) return result#增加def insert(sql,param=[]): sql = get_complete_sql(sql, param) result = _execute_commit(sql) return result#修改def update(sql,param=[]): sql = get_complete_sql(sql, param) result = _execute_commit(sql) return result#插入两条记录def insertTwo(sql1, sql2, param1, param2): result1 = insert(sql1,param1) result2 = insert(sql2,param2) result = result1 + result2 return result#删除def delete(sql, param): sql = get_complete_sql(sql, param) result = _execute_commit(sql) return result#执行多个sql修改def executemany(sql,params): count = 0 for item in params: result = update(sql=sql,param=item) count += result return count#执行带参查询def _execute_query_by_sql_param(sql,param=[]): sql = get_complete_sql(sql, param) result = _execute_query_oracle_sql(sql=sql) return resultdef _execute_commit(sql): result = 0 try: # 获取链接 connect = getConn() # 获取游标 cursor = connect.cursor() # 执行修改 cursor.execute(sql) #返回结果 result = cursor.rowcount # 提交数据 connect.commit() except Exception as e: print(e) finally: close_oracle(cursor, connect) return result#执行不带参查询def _execute_query_oracle_sql(sql): result = [] try: #获取链接 connect = getConn() #获取游标 cursor = connect.cursor() #执行查询 cursor.execute(sql) #获取数据 ,可以有多种方式 fetchall(),fetchmang(N)(N 为正整数),fetchone() result = cursor.fetchall() # count = cursor.rowcount # print("Total:", count) except Exception as e: print(e) finally: close_oracle(cursor,connect) return result#关闭链接def close_oracle(cursor,connect): try: if cursor != None: cursor.close() if connect != None: connect.close() except Exception as e: print(e)#改变参数方法def chang_list_param_to_tuple(param=[]): param_list = [] for item in param: param_list.append("'%s'" % item) return tuple(param_list)#获得拼接好的sqldef get_complete_sql(sql,param=[]): param = chang_list_param_to_tuple(param=param) sql = sql % param print(sql) return sqlif __name__ == '__main__': # sql = "select * from vc_user where instr(real_name,%s)>0" # param = ["张"] sql = "update po_monitor_lexicon set BINARY_SYSTEM = :1 ,IS_CRAWLED='N' where monitor_key = '金蝶办公'" param = [['0'],['2'],['3']] # result = _execute_many(sql,param) # result = fetchall(sql,param) # print(result) # result = fetchone(sql,param) # print(result)
配合框架使用,线程安全的代码,大家放心使用
from DBUtils.PooledDB import PooledDBfrom tutorial.poHelper import dbconfigimport cx_Oracle as oracle#import os# 增加环境变量,配合读取配置信息访问oracle#os.environ['NLS_LANG'] = dbconfig.NLS_LANG#os.environ['path'] = dbconfig.ORACLE_PATH #这个路径看况填写class OracleHelper: def __init__(self): # ora_dsn = host + ":" + port + "/" + dsn connKwargs = { 'user': dbconfig.ORACLE_USER, 'password': dbconfig.ORACLE_PASSWORD, 'dsn': dbconfig.ORACLE_HOST + ":" + str(dbconfig.ORACLE_PORT) + "/" + dbconfig.ORACLE_SID, 'nencoding': dbconfig.ORACLE_ECODING, 'threaded':True } self._pool = PooledDB(oracle, mincached=3, maxcached=20, maxshared=20, maxusage=10000,blocking=True, **connKwargs) def getConn(self): return self._pool.connection()_oracleManager = OracleHelper()def getConn(): """ 获取数据库连接 """ return _oracleManager.getConn()def insert(sql, params): return __execute(sql, params)def insertTwo(sql1, sql2, params1, params2): return __save(sql1, sql2, params1, params2)def update( sql, params): return __execute(sql, params)def delete(sql, params): return __execute(sql, params)def __execute(sql, param=[]): """ 执行sql语句 """ try: conn = getConn() cursor = conn.cursor() rowcount = cursor.execute(sql, param) cursor.close() conn.commit() conn.close() return rowcount except Exception as e: print (e) finally: cursor.close() conn.close()def __save(sql1, sql2,param1=[],params2=[]): """ 同时执行两条sql语句 """ try: conn = getConn() cursor = conn.cursor() rowcount = cursor.execute(sql1, param1) cursor.execute(sql2, params2) conn.commit() cursor.close() conn.close() return rowcount except Exception as e: print(e) finally: cursor.close() conn.close()def fetchone(sql,params=[]): """ 获取一条信息 """ try: conn = getConn() cursor = conn.cursor() rowcount = cursor.execute(sql,params) res = cursor.fetchone() cursor.close() conn.close() return res except Exception as e: print(e) cursor.close() conn.close()def fetchall(sql,params): """ 获取所有信息 """ try: conn = getConn() cursor = conn.cursor() rowcount = cursor.execute(sql,params) res = cursor.fetchall() cursor.close() conn.close() return res except Exception as e: print(e) cursor.close() conn.close()def executemany(sql,params): try: conn = getConn() cursor = conn.cursor() rowcount = cursor.executemany(sql, params) cursor.close() conn.commit() conn.close() return rowcount except Exception as e: print (e) finally: cursor.close() conn.close()if __name__ == '__main__': sql = "select * from vc_user where instr(real_name, :1 ) > 0" param = ['张'] # sql = "update po_monitor_lexicon set BINARY_SYSTEM = :1 ,IS_CRAWLED='N' where monitor_key = '金蝶办公'" # param = [['0'],['2'],['3']] # result = _execute_many(sql,param) result = fetchone(sql,param) for item in result: print(item) print(len(result)) # result = fetchone(sql,param) # print(result)