sqlalchemy – sqlalchemy中执行原生sql – 传参方式避免了sql注入(转)()

https://blog.csdn.net/xuezhangjun0121/article/details/103993135

def get_data_all(user_id, name, start_time, end_time, page=1, limit=10):
    """
    sqlalchemy orm 执行原生sql语句
    :return:
    """
    try:
        # 项目数据列表
        conditions = dict()
        base_sql_pre = "select * from table1 p  where p.status = 1  and p.user_id = :user_id"
 
        conditions.update({"user_id": user_id})
        if start_time:
            start_time_sql = " and p.create_time >= :start_time"
            conditions.update({"start_time": start_time})
        else:
            start_time_sql = ""
        if end_time:
            end_time_sql = " and p.create_time <= :end_time"
            conditions.update({"end_time": end_time})
        else:
            end_time_sql = ""
        if name:
            name_sql = " and p.name like concat('%', :name, '%')"
            conditions.update({"name": str(name).strip()})
        else:
            name_sql = ""
        # 分页
        offset_size = (page - 1) * limit
        page_sql = " limit :limit_size offset :offset_size"
        conditions.update({
            "limit_size": limit,
            "offset_size": offset_size
        })
 
        # 组合sql
        select_sql = base_sql_pre + start_time_sql + end_time_sql + name_sql + page_sql
        cursor = db.session.execute(select_sql, conditions)
        res = cursor.fetchall()
 
        # 总数
        total_count_sql = "select count(id) from table1 p where p.status = 1 and p.user_id = :user_id" + start_time_sql + end_time_sql + name_sql
        conditions.pop("limit_size")
        conditions.pop("offset_size")
        cursor = db.session.execute(total_count_sql, conditions)
        total_count = cursor.fetchall()[0][0]
        return res, total_count
    except Exception:
        traceback.print_exc()
————————————————
版权声明:本文为CSDN博主「Saggitarxm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/xuezhangjun0121/article/details/103993135
————————

https://blog.csdn.net/xuezhangjun0121/article/details/103993135

def get_data_all(user_id, name, start_time, end_time, page=1, limit=10):
    """
    sqlalchemy orm 执行原生sql语句
    :return:
    """
    try:
        # 项目数据列表
        conditions = dict()
        base_sql_pre = "select * from table1 p  where p.status = 1  and p.user_id = :user_id"
 
        conditions.update({"user_id": user_id})
        if start_time:
            start_time_sql = " and p.create_time >= :start_time"
            conditions.update({"start_time": start_time})
        else:
            start_time_sql = ""
        if end_time:
            end_time_sql = " and p.create_time <= :end_time"
            conditions.update({"end_time": end_time})
        else:
            end_time_sql = ""
        if name:
            name_sql = " and p.name like concat('%', :name, '%')"
            conditions.update({"name": str(name).strip()})
        else:
            name_sql = ""
        # 分页
        offset_size = (page - 1) * limit
        page_sql = " limit :limit_size offset :offset_size"
        conditions.update({
            "limit_size": limit,
            "offset_size": offset_size
        })
 
        # 组合sql
        select_sql = base_sql_pre + start_time_sql + end_time_sql + name_sql + page_sql
        cursor = db.session.execute(select_sql, conditions)
        res = cursor.fetchall()
 
        # 总数
        total_count_sql = "select count(id) from table1 p where p.status = 1 and p.user_id = :user_id" + start_time_sql + end_time_sql + name_sql
        conditions.pop("limit_size")
        conditions.pop("offset_size")
        cursor = db.session.execute(total_count_sql, conditions)
        total_count = cursor.fetchall()[0][0]
        return res, total_count
    except Exception:
        traceback.print_exc()
————————————————
版权声明:本文为CSDN博主「Saggitarxm」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/xuezhangjun0121/article/details/103993135