diff --git a/Framework/MainDriverApi.py b/Framework/MainDriverApi.py index f7734a5f..0757dbce 100644 --- a/Framework/MainDriverApi.py +++ b/Framework/MainDriverApi.py @@ -1147,6 +1147,76 @@ def run_test_case( CommonUtil.CreateJsonReport(TCInfo=after_execution_dict) return "passed" +# for sending variables without dom after command execution +def send_new_variables(): + try: + sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME + variables = [] + max_threshold = 50000 + for var_name in shared.shared_variables: + if var_name.startswith("__") and var_name.endswith("__"): + continue + var_value = shared.shared_variables[var_name] + try: + if len(json.dumps(var_value)) > max_threshold: + builder = SchemaBuilder() + builder.add_object(var_value) + schema = builder.to_schema() + if len(json.dumps(schema)) <= max_threshold: + variables.append({ + "type": "json_schema", + "variable_name": var_name, + "variable_value": schema, + "description": "", + }) + else: + variables.append({ + "type": "json_object", + "variable_name": var_name, + "variable_value": var_value, + "description": "", + }) + except (json.decoder.JSONDecodeError, TypeError): + try: + dir_ = {} + for attr_name in dir(var_value): + if attr_name.startswith('__'): + continue + try: + attr_value = getattr(var_value, attr_name) + dir_[attr_name] = str(type(attr_value)) + except Exception: # ignore getattr errors + pass + variables.append({ + "type": f"non_json: {str(var_value)}", + "variable_name": var_name, + "variable_value": dir_, + "description": "", + }) + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, str(e), 2) + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, str(e), 2) + + dom = None + + data = { + "variables": variables, + "dom_web": {"dom": dom}, + "node_id": shared.Get_Shared_Variables('node_id').lower() + } + res = RequestFormatter.request("post", + RequestFormatter.form_uri("node_ai_contents/"), + data=json.dumps(data), + verify=False + ) + if res.status_code == 500: + CommonUtil.ExecLog(sModuleInfo, res.json()["info"], 2) + elif res.status_code == 404: + CommonUtil.ExecLog(sModuleInfo, 'The chatbot API does not exist, server upgrade needed', 2) + return + except Exception as e: + CommonUtil.ExecLog(sModuleInfo, str(e), 2) def send_dom_variables(): try: @@ -1949,7 +2019,7 @@ def main(device_dict, all_run_id_info): shared.Set_Shared_Variables("zeuz_enable_variable_logging", "False") shared.Set_Shared_Variables("run_id", run_id) - shared.Set_Shared_Variables("node_id", CommonUtil.MachineInfo().getLocalUser()) + shared.Set_Shared_Variables("node_id", CommonUtil.MachineInfo().getLocalUser(), True) # so node id can't be changed and variable updates are in sync send_log_file_only_for_fail = ConfigModule.get_config_value("RunDefinition", "upload_log_file_only_for_fail") send_log_file_only_for_fail = False if send_log_file_only_for_fail.lower() == "false" else True diff --git a/Framework/Utilities/repl_service.py b/Framework/Utilities/repl_service.py new file mode 100644 index 00000000..976ede37 --- /dev/null +++ b/Framework/Utilities/repl_service.py @@ -0,0 +1,207 @@ +import json +import copy +import ssl +import traceback +from threading import Thread +import io +import contextlib +import sys + +import websocket +import time + +from Framework.Built_In_Automation.Shared_Resources import BuiltInFunctionSharedResources as sr +from Framework.MainDriverApi import send_new_variables + + +ws = None +connected = False +_stop = False + + + +def _send(msg): + global ws + try: + if ws is None: + return + if not isinstance(msg, str): + msg = json.dumps(msg) + ws.send(msg) + except Exception: + pass + + +def close(): + global ws, connected, _stop + connected = False + _stop = True + if ws is not None: + try: + ws.close(status=1000, reason="Closing REPL") + except Exception: + pass + + +def on_message(ws, message): + try: + data = json.loads(message) + except Exception: + print(f"[REPL] on_message non-JSON frame ignored: {message[:120]}") + return + if not isinstance(data, dict) or data.get("type") != "command": + return + + code = data.get("msg", "") + # snapshot protected values + protected_list = [] + protected_snapshot = {} + pre_existing = set() + try: + protected_list = list(getattr(sr, "protected_variables", []) or []) + for name in protected_list: + if name in sr.shared_variables: + protected_snapshot[name] = copy.deepcopy(sr.shared_variables[name]) + pre_existing.add(name) + except Exception: + pass + output_text = "" + error_text = None + _preview = code[:200].replace("\n", "\\n") + print("[REPL] received command:", _preview) + + buf = io.StringIO() + try: + with contextlib.redirect_stdout(buf): + # Try eval first for expressions + try: + result = None + try: + result = eval(code, sr.shared_variables, sr.shared_variables) + except SyntaxError: + # Not an expression so execute block + exec(code, sr.shared_variables, sr.shared_variables) + except NameError as ne: + ident = code.strip() + # see if single identifier referencing shared variable + if ident.isidentifier(): + if ident in sr.shared_variables: + result = sr.shared_variables[ident] + else: + raise + else: + raise + if result is not None: + print(result) + except Exception: + raise + output_text = buf.getvalue().strip() + except Exception: + error_text = traceback.format_exc() + finally: + buf.close() + print("[REPL] execution completed:", output_text) + + # restore protected values if tampered + tampered = [] + try: + for name in protected_list: + if name in pre_existing: + pre_val = protected_snapshot.get(name, None) + if name not in sr.shared_variables or sr.shared_variables.get(name) != pre_val: + sr.shared_variables[name] = pre_val + tampered.append(name) + else: + # remove if did not exist before + if name in sr.shared_variables: + try: + del sr.shared_variables[name] + except Exception: + sr.shared_variables.pop(name, None) + tampered.append(name) + except Exception: + pass + + if error_text: + _send({"type": "error", "msg": error_text}) + else: + # add warning line if any protected var was tampered + if tampered: + if output_text: + output_text = output_text + "\n" + "\n".join( + f"(read-only) Reverted attempt to modify {n}" for n in tampered + ) + else: + output_text = "\n".join(f"(read-only) Reverted attempt to modify {n}" for n in tampered) + _send({"type": "output", "msg": output_text}) + + # republish variables back to server so UI can refresh + try: + send_new_variables() + except Exception: + pass + + # Signal completion so UI can refresh variables after execution fully finishes + try: + _send({"type": "output", "msg": "__done__"}) + except Exception: + pass + + +def on_error(ws, error): + print(f"[REPL] on_error: {error}") + return + + +def on_close(ws=None, _a=None, _b=None): + global connected + connected = False + print("[REPL] connection closed") + + +def on_open(ws): + global connected + connected = True + print("[REPL] on_open: connected, sending status ping") + try: + _send({"type": "output", "msg": "__status__:node_online"}) + except Exception: + pass + + +def _run_loop(url): + global ws, _stop + while not _stop: + try: + ws = websocket.WebSocketApp( + url, + on_message=on_message, + on_error=on_error, + on_close=on_close, + ) + ws.on_open = on_open + + ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_interval=20, ping_timeout=10) + + except Exception as e: + print(f"[REPL] exception in run loop: {e}") + if _stop: + break + time.sleep(5) + + +def connect(url): + global connected, _stop + try: + _stop = False + print(f"[REPL] connect() invoked url={url}") + sys.stdout.flush() + t = Thread(target=_run_loop, args=(url,)) + t.daemon = True + t.start() + except Exception as outer: + print(f"[REPL] connect() exception: {outer}") + sys.stdout.flush() + +def ping_state(): + return {"connected": connected, "ws_is_none": ws is None, "stop": _stop} diff --git a/node_cli.py b/node_cli.py index 1a1e04a5..18f675a0 100755 --- a/node_cli.py +++ b/node_cli.py @@ -66,6 +66,9 @@ def adjust_python_path(): ) from Framework.Utilities import ConfigModule # noqa: E402 from Framework.Utilities import live_log_service # noqa: E402 + +from Framework.Utilities import repl_service # noqa: E402 + from Framework.node_server_state import STATE, LoginCredentials # noqa: E402 from server import main as node_server # noqa: E402 @@ -382,6 +385,17 @@ def live_log_service_addr(): protocol = "ws" server_addr = f"{protocol}://{server_url.netloc}" return f"{server_addr}/faster/v1/ws/live_log/send/{node_id}" + + def repl_service_addr(): + server_url = urlparse( + ConfigModule.get_config_value("Authentication", "server_address") + ) + if server_url.scheme == "https": + protocol = "wss" + else: + protocol = "ws" + server_addr = f"{protocol}://{server_url.netloc}" + return f"{server_addr}/faster/v1/ws/repl/send/{node_id}" def deploy_srv_addr(): server_url = urlparse( @@ -392,6 +406,9 @@ def deploy_srv_addr(): # Connect to the live log service. live_log_service.connect(live_log_service_addr()) + # Connect to the REPL service. + repl_service.connect(repl_service_addr()) + # WARNING: For local development only. # if "localhost" in host: # deploy_srv_addr = deploy_srv_addr.replace("8000", "8300")