吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 2800|回复: 7
收起左侧

[Python 转载] 【Python】Mysql操作类,仿Thinkphp

  [复制链接]
smallchen 发表于 2021-2-18 13:44
本帖最后由 smallchen 于 2021-2-18 14:07 编辑

还是习惯用Thinkphp的方式写sql查询,于是自己封装了一个
能自动识别字段并封装,插入和更新的时候如果有冗余的字段数据会剔除
代码如下:
调用方式如:
data = {
    'name': '我是',
    'title': '你爸爸',
    'create_time': datetime.datetime(2021, 3, 18, 0, 0)
}
result1 = Db().table('test2').insert(data)
result2 = Db().table('test2').whereRow('id', 1).update(data)
result3 = Db().table('test2').whereRow('id', 1).setOption('name', '你好')
result4 = Db().table('test2').insertGetId(data)
result5 = Db().table('test2').where([{'id', '(1,2)', 'in'}, {'name', '%%', 'like'}]).field('id,name').select()
result6 = Db().table('test2').where({'id', '(1,2)', 'in'}).field('id,name').find()
result7 = Db().table('test2').whereRow('id', 1).setInc('num', 2)
result8 = Db().table('test2').whereRow('id', 1).value('title')…………

如果用同一个链接,也可以
sql = Db();
sql .table('test2').insert(data)
sel.clear()
sql.table('test2').whereRow('id', 1).update(data)
[Python] 纯文本查看 复制代码
import pymysql
#   pip install pymysql

#   1、实例化类 sql = Db()
#   参数类:
#   table方法:入参:表名
#       使用方法: sql.table('test')
#   where方法:可多次调用,入参:字符串或字典(key:字段名,val:值,type:对应关系,默认为'=')或数组
#       使用方法: sql.where('id=1')
#       使用方法: sql.where({'id',1})
#       使用方法: sql.where({'id',1,'<>'})
#       使用方法: sql.where({'id','(1,2)','in'})
#       使用方法: sql.where([{'id','(1,2)','in'},{'name','%我%','like'}])
#   whereRow方法:可多次调用,入参:字段名,值,对应关系
#       使用方法: sql.whereRow('id',1)
#       使用方法: sql.whereRow('id',1,'>')
#   whereIn方法:可多次调用,入参:字段名,值(逗号拼接的字符串或元组)
#       使用方法: sql.whereIn('id','1,2')
#       使用方法: sql.whereIn('id',(1,2))
#   whereLike方法:可多次调用,入参:字段名,值
#       使用方法: sql.whereLike('name','%我%')
#   alias方法:对 table 方法的表名设置 别名
#       使用方法: sql.table('test').alias('a')
#   limit方法:查询前N条数据,入参:数量
#       使用方法: sql.limit(10)
#   order方法:排序,入参:排序字符串
#       使用方法: sql.order('id desc')
#   group方法:分组,入参:字符串
#       使用方法: sql.group('name,title')
#   field方法:查询字段,入参:字符串
#       使用方法: sql.field('id,name,title,create_time')
#   distinct方法:去重,入参:True或False,默认为True
#       使用方法: sql.distinct()
#   having方法:入参:字符串
#       使用方法: sql.having('b.id=1')
# =====================================================================================
#   操作类:
#   buildSql    返回select查询语句,出参:字符串
#       使用方法: sql.buildSql()
#   find    返回符合条件的第一条数据,出参:字典
#       使用方法: sql.find()
#   count   返回符合条件的数量:出参:Int
#       使用方法: sql.count()
#   value   入参:字段名,返回符合条件的一个字段的值,出参:值
#       使用方法: sql.value('name')
#   select  返回符合条件的多数据,出参:数组<字典>
#       使用方法: sql.select()
#   get 入参:值,识别主键返回符合条件的第一条数据,出参:字典
#       使用方法: sql.get(2)
#   insert  入参:字典,字段识别后根据键名插入数据,自动剔除表中没有的字段,出参:变更数量
#       使用方法: sql.insert({'id':1,'name':'你好'})
#   insertGetId 入参:字典,字段识别后根据键名插入数据并返回自增主键值,出参:值
#       使用方法: sql.insertGetId({'id':1,'name':'你好'})
#   insertAll   入参:数组<字典>,同 insert 的数组版,出参:变更数量
#       使用方法: sql.insertAll([{'id':1,'name':'你好'}])
#   update  入参:字典,根据条件更新数据,出参:变更数量
#       使用方法: sql.update({'name':'你好'})
#   setOption 入参:字段名,变更值,根据条件更新数据,出参:变更数量
#       使用方法: sql.setOption('name','你好')
#   delete 根据条件删除数据,出参:变更数量
#       使用方法: sql.delete()
#   setInc 入参:字段名,数字(步仅值,默认为1);字段值自增,出参:变更数量
#       使用方法: sql.setInc('num')
#       使用方法: sql.setInc('num',2)
#   setDec 入参:字段名,数字(步仅值,默认为1);字段值自减,出参:变更数量
#       使用方法: sql.setDec('num')
#       使用方法: sql.setDec('num',2)
#   query 入参:数据库语句
#       使用方法: sql.query('delete from id=1')

HOST = '127.0.0.1'  # 链接地址
USER = 'root'  # 数据库用户
PASS = '123456'  # 数据库密码
CHAR = 'utf8'  # 编码格式
DB = 'test'  # 数据库名称
PORT = 3306  # 数据库端口号
PREFIX = 'bs_'  # 表前缀


def format_field(val, mold):
    mold = str(mold).lower()
    if 'int' in mold:
        return str(val)
    if 'decimal' in mold:
        return str(val)
    if 'float' in mold:
        return str(val)
    if 'double' in mold:
        return str(val)
    if 'char' in mold:
        return format_val(val)
    if 'text' in mold:
        return format_val(val)
    if 'date' in mold:
        return format_val(str(val))
    if 'time' in mold:
        return format_val(str(val))
    if 'year' in mold:
        return format_val(str(val))
    if 'blob' in mold:
        return format_val(val)
    return format_val(val)


def format_val(val):
    types = typeof(val)
    if types == 'str':
        return "'%s'" % str(val).replace('\'', '"')
    if types == 'int':
        return "%s" % val


def typeof(variate):
    mold = None
    if isinstance(variate, int):
        mold = "int"
    elif isinstance(variate, str):
        mold = "str"
    elif isinstance(variate, float):
        mold = "float"
    elif isinstance(variate, list):
        mold = "list"
    elif isinstance(variate, tuple):
        mold = "tuple"
    elif isinstance(variate, dict):
        mold = "dict"
    elif isinstance(variate, set):
        mold = "set"
    return mold


class Db(object):
    __conn = None
    __map = []
    __name = ''
    __column = '*'
    __alias = ''
    __join = []
    __having = []
    __distinct = False
    __option = {}

    def __init__(self, host=HOST, username=USER, password=PASS, db=DB, charset=CHAR, port=PORT, prefix=PREFIX):
        self.host = host
        self.username = username
        self.password = password
        self.db = db
        self.charset = charset
        self.port = port
        self.prefix = prefix

    def __connect(self):
        self.__conn = pymysql.connect(host=self.host, port=self.port, user=self.username, password=self.password,
                                      db=self.db, charset=self.charset)
        self.cursor = self.__conn.cursor()

    def __close(self):
        self.cursor.close()
        self.__conn.close()

    def table(self, table):
        self.__name = self.prefix + table
        return self

    def where(self, where=None):
        if where is None:
            return self
        if typeof(where) == 'dict':
            self.__map.append({'key': where.get('key'), 'val': where.get('val'), 'type': where.get('type')})

        if typeof(where) == 'list':
            for item in where:
                self.where(item)
        if typeof(where) == 'str':
            self.__map.append(where)
        return self

    def whereRow(self, key, val, mark='='):
        return self.where({'key': key, 'val': val, 'type': mark})

    def whereIn(self, key, val):
        if typeof(val) == 'tuple':
            return self.where({'key': key, 'val': '(' + ','.join(val) + ')', 'type': 'in'})
        return self.where({'key': key, 'val': '(' + val + ')', 'type': 'in'})

    def whereLike(self, key, val):
        return self.where({'key': key, 'val': val, 'type': 'like'})

    def alias(self, name=''):
        self.__alias = name + ' '
        return self

    #   字段没处理明白,暂时禁用
    #def join(self, table, where, mold='inner'):
        #table = self.prefix + table
        #self.__join.append({'table': table, 'where': where, 'mold': mold})
        #return self
    #

    def limit(self, limit=1):
        self.__option['limit'] = format_val(limit)
        return self

    def order(self, order):
        self.__option['order'] = order
        return self

    def group(self, group):
        self.__option['group'] = group
        return self

    def field(self, field):
        self.__column = field
        return self

    def distinct(self, is_true=True):
        self.__distinct = is_true
        return self

    def having(self, where):
        self.__having.append(where)
        return self

    def __comQuerySql(self):
        if self.__name is None:
            return None

        column = self.__column
        if column == '*':
            column = self.__showColumn(True)

        sql = "select "
        if self.__distinct:
            sql += ' distinct '

        sql += str(column) + " from " + str(self.__name)

        if self.__alias != '':
            sql += ' ' + self.__alias

        if len(self.__join) > 0:
            if '*' in column:
                print('使用 join 必须指定字段名')
                exit(-1)
                return None
            for item in self.__join:
                sql += item['mold'] + ' join ' + item['table'] + ' on ' + item['where'] + ' '

        if len(self.__map) > 0:
            sql += ' where 1=1 '
            for item in self.__map:
                if typeof(item) == 'str':
                    sql += ' and ( ' + item + ' ) '
                elif typeof(item) == 'dict':
                    sql += ' and ( ' + item.get('key') + ' ' + item.get('type') + ' ' + str(item.get('val')) + ' ) '

        if self.__option.get('group'):
            sql += ' group by ' + self.__option['group'] + ' '

        if len(self.__having) > 0:
            for item in self.__having:
                sql += ' ' + item

        if self.__option.get('order'):
            sql += ' order by ' + self.__option['order'] + ' '

        if self.__option.get('limit'):
            sql += ' limit ' + self.__option['limit']
        return sql

    def buildSql(self):
        column = self.__column

        if column == '*':
            column = self.__showColumn(True)

        sql = self.__comQuerySql()
        return sql

    def __getField(self, table=None):
        column = self.__column
        if '*' in column:
            column = self.__showColumn(True, table=table)

        return column

    # 接下来是数据库操作

    def find(self):

        column = self.__getField()
        sql = self.__comQuerySql()
        if sql is None:
            return None
        result = None
        try:
            self.__connect()
            self.cursor.execute(sql)
            result = self.cursor.fetchone()
            self.__close()
        except Exception as e:
            print(e)
            exit(-1)
        if result is None:
            return None
        data = {}
        for index, k in enumerate(column.split(',')):
            data[k] = result[index]
        return data

    def select(self):
        if self.__name is None:
            return

        column = self.__getField()

        sql = self.__comQuerySql()
        if sql is None:
            return None
        result = None
        try:
            self.__connect()
            self.cursor.execute(sql)
            result = self.cursor.fetchall()
            self.__close()
        except Exception as e:
            print(e)
            exit(-1)
        if result is None:
            return None
        data = []
        sp = column.split(',')
        for index, item in enumerate(result):
            dicts = {}
            for index2, k in enumerate(sp):
                dicts[k] = result[index][index2]
            data.append(dicts)
        return data

    def get(self, vid):
        pk = self.__getPk()
        if pk is not None:
            self.whereRow(pk, vid)
        return self.find()

    def value(self, field):
        self.__column = field
        sql = self.__comQuerySql()
        result = None
        try:
            self.__connect()
            self.cursor.execute(sql)
            result = self.cursor.fetchone()
            self.__close()
        except Exception as e:
            print(e)
            exit(-1)
        if result is not None:
            return result[0]
        else:
            return None

    def count(self):
        return self.value('count(*)')

    def insert(self, data):
        all_column = self.__showColumn()
        fields = ''
        values = ''
        i = 0
        for key in data:
            for column in all_column:
                if column['field'] == key:
                    if i == 0:
                        fields = key
                        values = format_field(data[key], column['type'])
                    else:
                        fields += ',' + key
                        values += ',' + format_field(data[key], column['type'])
                    i += 1
        if fields == '' or values == '':
            return 0
        sql = str(" insert into " + self.__name + " ( " + fields + ") values ( " + values + " ) ")
        return self.__edit(sql)

    def update(self, data):
        if typeof(data) != 'dict':
            return None
        all_column = self.__showColumn()
        fields = ''
        i = 0
        sql = ''
        if len(self.__map) > 0:
            sql += ' where 1=1 '
            for item in self.__map:
                if typeof(item) == 'str':
                    sql += ' and ( ' + item + ' ) '
                elif typeof(item) == 'dict':
                    sql += ' and ( ' + item.get('key') + ' ' + item.get('type') + ' ' + str(item.get('val')) + ' ) '
        else:
            print('禁止不使用 where 更新数据')
        for key in data:
            for column in all_column:
                if column['field'] == key:
                    if i == 0:
                        fields = key + '=' + format_field(data[key], column['type'])
                    else:
                        fields += str(',' + key + '=' + format_field(data[key], column['type']))
                    i += 1
        if fields == '':
            return 0
        sql = str("update " + self.__name + " set " + fields + ' ' + sql)
        print(sql)
        return self.__edit(sql)

    def insertGetId(self, data):
        if typeof(data) != 'dict':
            return None
        all_column = self.__showColumn()
        fields = ''
        values = ''
        i = 0
        for key in data:
            for column in all_column:
                if column['field'] == key:
                    if i == 0:
                        fields = key
                        values = format_field(data[key], column['type'])
                    else:
                        fields += ',' + key
                        values += ',' + format_field(data[key], column['type'])
                    i += 1
        if fields == '' or values == '':
            return 0
        sql = str(" insert into " + self.__name + " ( " + fields + ") values ( " + values + " ) ")
        pk = 0
        try:
            self.__connect()
            self.cursor.execute(sql)
            pk = self.__conn.insert_id()
            self.__close()
        except Exception as e:
            self.__conn.rollback()
        return pk

    def setOption(self, key, val):
        return self.update({key: val})

    def insertAll(self, datas):
        if typeof(datas) != 'list':
            return None
        count = 0
        for data in datas:
            count += self.insert(data)
        return count

    def delete(self):
        i = 0
        sql = ''
        if len(self.__map) > 0:
            sql += ' where 1=1 '
            for item in self.__map:
                if typeof(item) == 'str':
                    sql += ' and ( ' + item + ' ) '
                elif typeof(item) == 'dict':
                    sql += ' and ( ' + item.get('key') + ' ' + item.get('type') + ' ' + str(item.get('val')) + ' ) '
        else:
            print('禁止不使用 where 删除数据')
        sql = str("delete from " + self.__name + sql)
        return self.__edit(sql)

    def setInc(self, key, step=1):
        return self.update({key: key + '+' + str(step)})

    def setDec(self, key, step=1):
        return self.update({key: key + '-' + str(step)})

    def query(self, sql):
        return self.__edit(sql)

    def __showColumn(self, is_str=False, table=None):
        if table is None:
            table = self.__name
        list_data = None
        sql = "SHOW FULL COLUMNS FROM " + table
        try:
            self.__connect()
            self.cursor.execute(sql)
            list_data = self.cursor.fetchall()
            self.__close()
        except Exception as e:
            print(e)
        if is_str:
            return ','.join(list([item[0] for item in list_data]))
        return list([{'field': item[0], 'type': item[1], 'key': item[4]} for item in list_data])

    def __getPk(self):
        fields = self.__showColumn()
        for field in fields:
            if field['key'] == 'PRI':
                return field['field']
        return None

    def __edit(self, sql):
        count = 0
        try:
            self.__connect()
            count = self.cursor.execute(sql)
            self.__conn.commit()
            self.__close()
        except Exception as e:
            self.__conn.rollback()
            print('Error:  ', sql)
            print(e)
        return count

    def clear(self):
        self.__map = []
        self.__name = ''
        self.__column = '*'
        self.__alias = ''
        self.__join = []
        self.__having = []
        self.__distinct = False
        self.__option = {}

免费评分

参与人数 3吾爱币 +7 热心值 +3 收起 理由
yk2575173948 + 1 + 1 热心回复!
mengqiu + 1 + 1 终于有人忍不住做好了
苏紫方璇 + 5 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

田三水 发表于 2021-2-18 13:56
感谢大佬!
龙小 发表于 2021-2-18 13:59
ggganb 发表于 2021-2-18 14:21
laochaguan 发表于 2021-2-18 14:26
感谢大佬!
酷鸟 发表于 2021-2-18 14:32
牛逼啊  建议开个github 修复更新都方便
逝去的初夏 发表于 2021-2-18 15:38
看着挺有意思的 链式编程全部都返回自己
hq8205 发表于 2021-2-18 22:01
不错,挺有想法, 先收藏了
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-1-16 08:59

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表