吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 419|回复: 12
收起左侧

[求助] 这个能看出当前Python的版本适宜几点几吗?

[复制链接]
冥界3大法王 发表于 2024-11-5 14:39


[Python] 纯文本查看 复制代码
#!/usr/bin/env python3

"""
Copyright (C) 2020, Alexandre Gazet.

This file is part of ret-sync plugin for Binary Ninja.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import binaryninjaui
if 'qt_major_version' in binaryninjaui.__dict__ and binaryninjaui.qt_major_version == 6:
    from PySide6 import QtCore
    from PySide6.QtCore import Qt
else:
    from PySide2 import QtCore
    from PySide2.QtCore import Qt

from binaryninjaui import DockHandler
from binaryninjaui import UIAction, UIActionHandler, UIContext, UIContextNotification
from binaryninjaui import ViewFrame

from binaryninja.plugin import BackgroundTaskThread, PluginCommand


from collections import OrderedDict
import socket
import io
import sys
import asyncore
import threading
import json
import base64
from pathlib import Path
from pathlib import Path as RemotePath
from dataclasses import dataclass

from .retsync import rsconfig as rsconfig
from .retsync.rsconfig import rs_encode, rs_decode, rs_log, rs_debug, load_configuration
from .retsync.rswidget import SyncDockWidget


class SyncHandler(object):

    # location request, update disassembly view
    def req_loc(self, sync):
        offset, base = sync['offset'], sync.get('base')
        self.plugin.goto(base, offset)

    def req_rbase(self, sync):
        self.plugin.set_remote_base(sync['rbase'])

    def req_cmt(self, sync):
        offset, base, cmt = sync['offset'], sync.get('base'), sync['msg']
        self.plugin.add_cmt(base, offset, cmt)

    def req_fcmt(self, sync):
        offset, base, cmt = sync['offset'], sync.get('base'), sync['msg']
        self.plugin.add_fcmt(base, offset, cmt)

    def req_rcmt(self, sync):
        offset, base = sync['offset'], sync.get('base')
        self.plugin.reset_cmt(base, offset)

    def req_cmd(self, sync):
        msg_b64, offset, base = sync['msg'], sync['offset'], sync['base']
        cmt = rs_decode(base64.b64decode(msg_b64))
        self.plugin.add_cmt(base, offset, cmt)

    def req_cursor(self, sync):
        cursor_addr = self.get_cursor()
        if cursor_addr:
            self.client.send(rs_encode(hex(cursor_addr)))
        else:
            rs_log('failed to get cursor location')

    def req_not_implemented(self, sync):
        rs_log(f"request type {sync['type']} not implemented")

    def parse(self, client, sync):
        self.client = client
        stype = sync['type']
        if stype not in self.req_handlers:
            rs_log("unknown sync request: %s" % stype)
            return

        if not self.plugin.sync_enabled:
            rs_debug("[-] %s request droped because no program is enabled" % stype)
            return

        req_handler = self.req_handlers[stype]
        req_handler(sync)

    def __init__(self, plugin):
        self.plugin = plugin
        self.client = None
        self.req_handlers = {
            'loc': self.req_loc,
            'rbase': self.req_rbase,
            'cmd': self.req_cmd,
            'cmt': self.req_cmt,
            'rcmt': self.req_rcmt,
            'fcmt': self.req_fcmt,
            'cursor': self.req_cursor,

            'raddr': self.req_not_implemented,
            'patch': self.req_not_implemented,
            'rln': self.req_not_implemented,
            'rrln': self.req_not_implemented,
            'lbl': self.req_not_implemented,
            'bps_get': self.req_not_implemented,
            'bps_set': self.req_not_implemented,
            'modcheck': self.req_not_implemented,
        }


class NoticeHandler(object):

    def is_windows_dbg(self, dialect):
        return (dialect in ['windbg', 'x64_dbg', 'ollydbg2'])

    def req_new_dbg(self, notice):
        dialect = notice['dialect']
        rs_log(f"new_dbg: {notice['msg']}")
        self.plugin.bootstrap(dialect)

        if sys.platform.startswith('linux') or sys.platform == 'darwin':
            if self.is_windows_dbg(dialect):
                global RemotePath
                from pathlib import PureWindowsPath as RemotePath

    def req_dbg_quit(self, notice):
        self.plugin.reset_client()

    def req_dbg_err(self, notice):
        self.plugin.sync_enabled = False
        rs_log("dbg err: disabling current program")

    def req_module(self, notice):
        pgm = RemotePath(notice['path']).name
        if not self.plugin.sync_mode_auto:
            rs_log(f"sync mod auto off, dropping mod request ({pgm})")
        else:
            self.plugin.set_program(pgm)

    def req_idb_list(self, notice):
        output = "open program(s):\n"
        for i, pgm in enumerate(self.plugin.pgm_mgr.as_list()):
            is_active = ' (*)' if pgm.path.name == self.plugin.current_pgm else ''
            output += f"[{i}] {str(pgm.path.name)} {is_active}\n"

        self.plugin.broadcast(output)

    def req_idb_n(self, notice):
        idb = notice['idb']
        try:
            idbn = int(idb)
        except (TypeError, ValueError) as e:
            self.plugin.broadcast('> index error: n should be a decimal value')
            return

        self.plugin.set_program_id(idbn)

    def req_sync_mode(self, notice):
        mode = notice['auto']
        rs_log(f"sync mode auto: {mode}")
        if mode == 'on':
            self.plugin.sync_mode_auto = True
        elif mode == 'off':
            self.plugin.sync_mode_auto = False
        else:
            rs_log(f"sync mode unknown: {mode}")

    def req_bc(self, notice):
        action = notice['msg']

        if action == 'on':
            self.plugin.cb_trace_enabled = True
            rs_log('color trace enabled')
        elif action == 'off':
            self.plugin.cb_trace_enabled = False
            rs_log('color trace disabled')
        elif action == 'oneshot':
            self.plugin.cb_trace_enabled = True

    def parse(self, notice):
        ntype = notice['type']
        if ntype not in self.req_handlers:
            rs_log("unknown notice request: %s" % ntype)
            return

        req_handler = self.req_handlers[ntype]
        req_handler(notice)

    def __init__(self, plugin):
        self.plugin = plugin
        self.req_handlers = {
            'new_dbg': self.req_new_dbg,
            'dbg_quit': self.req_dbg_quit,
            'dbg_err': self.req_dbg_err,
            'module': self.req_module,
            'idb_list': self.req_idb_list,
            'sync_mode': self.req_sync_mode,
            'idb_n': self.req_idb_n,
            'bc': self.req_bc,
        }


class RequestType(object):
    NOTICE = '[notice]'
    SYNC = '[sync]'

    @staticmethod
    def extract(request):
        if request.startswith(RequestType.NOTICE):
            return RequestType.NOTICE
        elif request.startswith(RequestType.SYNC):
            return RequestType.SYNC
        else:
            return None

    @staticmethod
    def normalize(request, tag):
        request = request[len(tag):]
        request = request.replace("\\", "\\\\")
        request = request.replace("\n", "")
        return request.strip()


class RequestHandler(object):

    def __init__(self, plugin):
        self.plugin = plugin
        self.client_lock = threading.Lock()
        self.notice_handler = NoticeHandler(plugin)
        self.sync_handler = SyncHandler(plugin)

    def safe_parse(self, client, request):
        self.client_lock.acquire()
        self.parse(client, request)
        self.client_lock.release()

    def parse(self, client, request):
        req_type = RequestType.extract(request)

        if not req_type:
            rs_log("unknown request type")
            return

        payload = RequestType.normalize(request, req_type)

        try:
            req_obj = json.loads(payload)
        except ValueError:
            rs_log("failed to parse request JSON\n %s\n" % payload)
            return

        rs_debug(f"REQUEST{req_type}:{req_obj['type']}")

        if req_type == RequestType.NOTICE:
            self.notice_handler.parse(req_obj)
        elif req_type == RequestType.SYNC:
            self.sync_handler.parse(client, req_obj)


class ClientHandler(asyncore.dispatcher_with_send):

    def __init__(self, sock, request_handler):
        asyncore.dispatcher_with_send.__init__(self, sock)
        self.request_handler = request_handler

    def handle_read(self):
        data = rs_decode(self.recv(8192))

        if data and data != '':
            fd = io.StringIO(data)
            batch = fd.readlines()

            for request in batch:
                self.request_handler.safe_parse(self, request)
        else:
            rs_debug("handler lost client")
            self.close()

    def handle_expt(self):
        rs_log("client error")
        self.close()

    def handle_close(self):
        rs_log("client quit")
        self.close()


class ClientListener(asyncore.dispatcher):

    def __init__(self, plugin):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind((plugin.user_conf.host, plugin.user_conf.port))
        self.listen(1)
        self.plugin = plugin

    def handle_accept(self):
        pair = self.accept()
        if pair is not None:
            sock, addr = pair
            rs_log('incoming connection from %s' % repr(addr))
            self.plugin.client = ClientHandler(sock, self.plugin.request_handler)

    def handle_expt(self):
        rs_log("listener error")
        self.close()

    def handle_close(self):
        rs_log("listener close")
        self.close()


class ClientListenerTask(threading.Thread):

    def __init__(self, plugin):
        threading.Thread.__init__(self)
        self.plugin = plugin
        self.server = None

    def is_port_available(self, host, port):
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            if sys.platform == 'win32':
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
            sock.bind((host, port))
            return True
        except Exception as e:
            return False
        finally:
            sock.close()

    def run(self):
        if not self.is_port_available(self.plugin.user_conf.host, self.plugin.user_conf.port):
            rs_log(f"aborting, port {self.plugin.user_conf.port} already in use")
            self.plugin.cmd_syncoff()
            return

        try:
            self.server = ClientListener(self.plugin)
            self.plugin.reset_client()
            rs_log('server started')
            asyncore.loop()
        except Exception as e:
            rs_log('server initialization failed')
            self.cancel()
            self.plugin.cmd_syncoff()

    def cancel(self):
        if self.server:
            rs_log('server shutdown')
            asyncore.close_all()
            self.server.close()
            self.server = None


@dataclass
class Program:
    path: Path
    base: int = None


# ProgramManager is used to keep track of opened tabs
# and program states
class ProgramManager(object):
    def __init__(self):
        self.opened = OrderedDict()

    def add(self, file):
        ppath = Path(file)
        if ppath.name in self.opened:
            rs_log(f"name collision:\n  - {ppath.name}\n  - {self.opened[ppath.name].name}")
        else:
            self.opened[ppath.name] = Program(ppath)

    def remove(self, file):
        pgm = Path(file).name
        if pgm in self.opened:
            del self.opened[pgm]

    def exists(self, pgm):
        return (Path(pgm).name in self.opened)

    def reset_bases(self):
        for _, pgm in self.opened.items():
            pgm.base = None

    def get_base(self, pgm):
        if self.exists(pgm):
            return self.opened[pgm].base

    def set_base(self, pgm, base):
        if self.exists(pgm):
            self.opened[pgm].base = base

    def get_at(self, index):
        if len(self.opened) > index:
            return list(self.opened)[index]
        else:
            return None

    def as_list(self):
        return self.opened.values()

    def list_dyn(self):
        self.opened = {}
        dock = DockHandler.getActiveDockHandler()
        view_frame = dock.getViewFrame()

        if view_frame:
            frames = view_frame.parent()
            for i in range(frames.count()):
                widget = frames.widget(i)
                vf = ViewFrame.viewFrameForWidget(widget)
                if vf:
                    file_name = vf.getFileContext().getFilename
                    self.add(file_name)

        return self.opened


class SyncPlugin(UIContextNotification):

    def __init__(self):
        UIContextNotification.__init__(self)
        UIContext.registerNotification(self)
        self.request_handler = RequestHandler(self)
        self.client_listener = None
        self.client = None
        self.next_tab_lock = threading.Event()
        self.pgm_mgr = ProgramManager()

        # binary ninja objects
        self.widget = None
        self.view_frame = None
        self.view = None
        self.binary_view = None
        self.frame = None

        # context
        self.current_tab = None
        self.current_pgm = None
        self.target_tab = None
        self.base = None
        self.base_remote = None
        self.sync_enabled = False
        self.sync_mode_auto = True
        self.cb_trace_enabled = False

    def init_widget(self):
        dock_handler = DockHandler.getActiveDockHandler()
        parent = dock_handler.parent()
        self.widget = SyncDockWidget.create_widget("ret-sync plugin", parent)
        dock_handler.addDockWidget(self.widget, Qt.BottomDockWidgetArea, Qt.Horizontal, True, False)

    def OnAfterOpenFile(self, context, file, frame):
        self.pgm_mgr.add(file.getRawData().file.original_filename)
        return True

    def OnBeforeCloseFile(self, context, file, frame):
        filename = file.getRawData().file.original_filename
        self.pgm_mgr.remove(filename)

        if Path(filename).name == self.target_tab:
            self.target_tab = None

        return True

    def OnViewChange(self, context, frame, type):
        if frame:
            if frame != self.view_frame:
                self.view_frame = frame
                self.view = frame.getCurrentViewInterface()
                self.data = self.view.getData()
                self.binary_view = self.view_frame.actionContext().binaryView
                self.current_tab = Path(self.binary_view.file.original_filename).name
                self.base = self.binary_view.start

                # attempt to restore the cached remote base
                self.base_remote = self.pgm_mgr.get_base(self.current_tab)
                if self.base_remote:
                    rs_log(f"set remote base: {hex(self.base_remote)}")

                self.pgm_target()
            else:
                pass
                # TODO navigate
        else:
            self.base = None
            self.base_remote = None
            self.view = None
            self.binary_view = None
            self.current_tab = None

    def bootstrap(self, dialect):
        self.pgm_mgr.reset_bases()
        self.widget.set_connected(dialect)

        if dialect in rsconfig.DBG_DIALECTS:
            self.dbg_dialect = rsconfig.DBG_DIALECTS[dialect]
            rs_log("set debugger dialect to %s, enabling hotkeys" % dialect)

    def reset_client(self):
        self.sync_enabled = False
        self.cb_trace_enabled = False
        self.current_pgm = None
        self.widget.reset_client()

    def broadcast(self, msg):
        self.client.send(rs_encode(msg))
        rs_log(msg)

    def set_program(self, pgm):
        self.widget.set_program(pgm)
        if not self.pgm_mgr.exists(pgm):
            return
        self.sync_enabled = True
        self.current_pgm = pgm
        rs_log(f"set current program: {pgm}")
        self.pgm_target_with_lock(pgm)

    def set_program_id(self, index):
        pgm = self.pgm_mgr.get_at(index)
        if pgm:
            self.broadcast(f"> active program is now \"{pgm}\" ({index})")
            self.pgm_target_with_lock(pgm)
        else:
            self.broadcast(f"> idb_n error: index {index} is invalid (see idblist)")

    def pgm_target_with_lock(self, pgm=None):
        self.next_tab_lock.clear()
        self.pgm_target(pgm)
        self.next_tab_lock.wait()

    def restore_tab(self):
        if self.current_tab == self.current_pgm:
            return True

        if not self.pgm_mgr.exists(self.current_pgm):
            return False
        else:
            self.pgm_target_with_lock(self.current_pgm)
            return True

    def pgm_target(self, pgm=None):
        if pgm:
            self.target_tab = pgm

        if not self.target_tab:
            return

        try:
            if self.target_tab != self.current_tab:
                self.trigger_action("Next Tab")
            else:
                self.target_tab = None
                self.next_tab_lock.set()
        except Exception as e:
            rs_log('error while switching tabs')

    def trigger_action(self, action: str):
        handler = UIActionHandler().actionHandlerFromWidget(
            DockHandler.getActiveDockHandler().parent())
        handler.executeAction(action)

    # check if address is within a valid segment
    def is_safe(self, offset):
        return self.binary_view.is_valid_offset(offset)

    # rebase (and update) address with respect to local image base
    def rebase(self, base, offset):
        if base is not None:
            # check for non-compliant debugger client
            if base > offset:
                rs_log('unsafe addr: 0x%x > 0x%x' % (base, offset))
                return None

            # update base address of remote module
            if self.base_remote != base:
                self.pgm_mgr.set_base(self.current_tab, base)
                self.base_remote = base

            dest = self.rebase_local(offset)

        if not self.is_safe(dest):
            rs_log('unsafe addr: 0x%x not in valid segment' % dest)
            return None

        return dest

    # rebase address with respect to local image base
    def rebase_local(self, offset):
        if not (self.base == self.base_remote):
            offset = (offset - self.base_remote) + self.base

        return offset

    # rebase address with respect to remote image base
    def rebase_remote(self, offset):
        if not (self.base == self.base_remote):
            offset = (offset - self.base) + self.base_remote

        return offset

    def set_remote_base(self, rbase):
        self.pgm_mgr.set_base(self.current_tab, rbase)
        self.base_remote = rbase

    def goto(self, base, offset):
        if not self.sync_enabled:
            return

        if self.restore_tab():
            goto_addr = self.rebase(base, offset)
            view = self.binary_view.view
            if not self.binary_view.navigate(view, goto_addr):
                rs_log(f"goto {hex(goto_addr)} error")

            if self.cb_trace_enabled:
                self.color_callback(goto_addr)
        else:
            rs_log('goto: no view available')

    def color_callback(self, hglt_addr):
        blocks = self.binary_view.get_basic_blocks_at(hglt_addr)
        for block in blocks:
            block.function.set_user_instr_highlight(hglt_addr, rsconfig.CB_TRACE_COLOR)

    def get_cursor(self):
        if not self.view_frame:
            return None
        offset = self.view_frame.getCurrentOffset()
        return self.rebase_remote(offset)

    def add_cmt(self, base, offset, cmt):
        cmt_addr = self.rebase(base, offset)
        if cmt_addr:
            in_place = self.binary_view.get_comment_at(cmt_addr)
            if in_place:
                cmt = f"{in_place}\n{cmt}"

            self.binary_view.set_comment_at(cmt_addr, cmt)

    def reset_cmt(self, base, offset):
        cmt_addr = self.rebase(base, offset)
        if cmt_addr:
            self.binary_view.set_comment_at(cmt_addr, '')

    def add_fcmt(self, base, offset, cmt):
        if not self.binary_view:
            return

        cmt_addr = self.rebase(base, offset)
        for fn in self.binary_view.get_functions_containing(cmt_addr):
            fn.comment = cmt

    def commands_available(self):
        if (self.sync_enabled and self.dbg_dialect):
            return True

        rs_log('commands not available')
        return False

    # send a command to the debugger
    def send_cmd(self, cmd, args, oneshot=False):
        if not self.commands_available():
            return

        if cmd not in self.dbg_dialect:
            rs_log(f"{cmd}: unknown command in dialect")
            return

        cmdline = self.dbg_dialect[cmd]
        if args and args != '':
            cmdline += args
        if oneshot and ('oneshot_post' in self.dbg_dialect):
            cmdline += self.dbg_dialect['oneshot_post']

        self.client.send(rs_encode(cmdline))

    def send_cmd_raw(self, cmd, args,):
        if not self.commands_available():
            return

        if 'prefix' in self.dbg_dialect:
            cmd_pre = self.dbg_dialect['prefix']
        else:
            cmd_pre = ''

        cmdline = f"{cmd_pre}{cmd} {args}"
        self.client.send(rs_encode(cmdline))

    def send_simple_cmd(self, cmd):
        self.send_cmd(cmd, '')

    def generic_bp(self, bp_cmd, oneshot=False):
        ui_addr = self.view_frame.getCurrentOffset()
        if not ui_addr:
            rs_log('failed to get cursor location')
            return

        if not self.base_remote:
            rs_log(f"{cmd} failed, remote base of {self.current_pgm} program unknown")
            return

        remote_addr = self.rebase_remote(ui_addr)
        self.send_cmd(bp_cmd, hex(remote_addr), oneshot)

    def cmd_go(self, ctx=None):
        self.send_simple_cmd('go')

    def cmd_si(self, ctx=None):
        self.send_simple_cmd('si')

    def cmd_so(self, ctx=None):
        self.send_simple_cmd('so')

    def cmd_translate(self, ctx=None):
        ui_addr = self.view_frame.getCurrentOffset()
        if not ui_addr:
            rs_log('failed to get cursor location')
            return

        rs_debug(f"translate address {hex(ui_addr)}")
        args = f"{hex(self.base)} {hex(ui_addr)} {self.current_pgm}"
        self.send_cmd_raw("translate", args)

    def cmd_bp(self, ctx=None):
        self.generic_bp('bp')

    def cmd_hwbp(self, ctx=None):
        self.generic_bp('hbp')

    def cmd_bp1(self, ctx=None):
        self.generic_bp('bp1', True)

    def cmd_hwbp1(self, ctx=None):
        self.generic_bp('hbp1', True)

    def cmd_sync(self, ctx=None):
        if not self.pgm_mgr.opened:
            rs_log('please open a tab first')
            return

        if self.client_listener:
            rs_log('already listening')
            return

        local_path = str(self.pgm_mgr.opened[self.current_tab].path)
        self.user_conf = load_configuration(local_path)
        self.client_listener = ClientListenerTask(self)
        self.client_listener.start()

    def cmd_syncoff(self, ctx=None):
        if self.client_listener:
            self.client_listener.cancel()
            self.client_listener = None
            self.widget.reset_status()
        else:
            rs_log('not listening')

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

kloco303 发表于 2024-11-5 14:57
3.9以上吧
kloco303 发表于 2024-11-5 14:58
f"{cmd}: unknown command in dialect  我从字符串处理看看
kloco303 发表于 2024-11-5 15:02
应该3.6以上都可以运行,就看第三方库支持不支持了!
yuth 发表于 2024-11-5 15:12
Python 3.6及以上,f是Python 3.6版本引入的一个新的字符串格式化方法。f是format的缩写
keweiye 发表于 2024-11-5 15:16
用3.10吧
kloco303 发表于 2024-11-5 15:19
yuth 发表于 2024-11-5 15:12
Python 3.6及以上,f是Python 3.6版本引入的一个新的字符串格式化方法。f是format的缩写

print("My name is {} and I am {} years old.".format(name, age))  FORMAT是3.0就引进了吧
kloco303 发表于 2024-11-5 15:53
用3.13
PDchaowu 发表于 2024-11-5 17:28
你要反编译啊。。
Randle 发表于 2024-11-6 20:59
3.8左右,3.11跑不出,py自动分块
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-11-28 05:48

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表