diff --git a/changelog.md b/changelog.md index 1e9e72cd..ff3eba3b 100644 --- a/changelog.md +++ b/changelog.md @@ -10,6 +10,7 @@ Features Bug Fixes --------- * Force a prompt_toolkit refresh after fzf history search to avoid display glitches. +* Include `status` footer in paged output. 1.57.0 (2026/02/25) diff --git a/mycli/main.py b/mycli/main.py index 03a2418d..00904399 100755 --- a/mycli/main.py +++ b/mycli/main.py @@ -934,29 +934,25 @@ def output_res(results: Generator[SQLResult], start: float) -> None: nonlocal mutating result_count = watch_count = 0 for result in results: - title = result.title - cur = result.results - headers = result.headers - status = result.status - command = result.command - logger.debug("title: %r", title) - logger.debug("headers: %r", headers) - logger.debug("rows: %r", cur) - logger.debug("status: %r", status) + logger.debug("title: %r", result.title) + logger.debug("headers: %r", result.headers) + logger.debug("rows: %r", result.results) + logger.debug("status: %r", result.status) + logger.debug("command: %r", result.command) threshold = 1000 # If this is a watch query, offset the start time on the 2nd+ iteration # to account for the sleep duration - if command is not None and command["name"] == "watch": + if result.command is not None and result.command["name"] == "watch": if watch_count > 0: try: - watch_seconds = float(command["seconds"]) + watch_seconds = float(result.command["seconds"]) start += watch_seconds except ValueError as e: self.echo(f"Invalid watch sleep time provided ({e}).", err=True, fg="red") sys.exit(1) else: watch_count += 1 - if is_select(status) and isinstance(cur, Cursor) and cur.rowcount > threshold: + if is_select(result.status) and isinstance(result.results, Cursor) and result.results.rowcount > threshold: self.echo( f"The result set has more than {threshold} rows.", fg="red", @@ -974,9 +970,10 @@ def output_res(results: Generator[SQLResult], start: float) -> None: max_width = None formatted = self.format_output( - title, - cur, - headers, + result.title, + result.results, + result.headers, + result.postamble, special.is_expanded_output(), special.is_redirected(), self.null_string, @@ -990,7 +987,7 @@ def output_res(results: Generator[SQLResult], start: float) -> None: if result_count > 0: self.echo("") try: - self.output(formatted, status) + self.output(formatted, result.status) except KeyboardInterrupt: pass if self.beep_after_seconds > 0 and t >= self.beep_after_seconds: @@ -1002,20 +999,17 @@ def output_res(results: Generator[SQLResult], start: float) -> None: start = time() result_count += 1 - mutating = mutating or is_mutating(status) + mutating = mutating or is_mutating(result.status) # get and display warnings if enabled - if self.show_warnings and isinstance(cur, Cursor) and cur.warning_count > 0: + if self.show_warnings and isinstance(result.results, Cursor) and result.results.warning_count > 0: warnings = sqlexecute.run("SHOW WARNINGS") for warning in warnings: - title = warning.title - cur = warning.results - headers = warning.headers - status = warning.status formatted = self.format_output( - title, - cur, - headers, + warning.title, + warning.results, + warning.headers, + warning.postamble, special.is_expanded_output(), special.is_redirected(), self.null_string, @@ -1024,7 +1018,7 @@ def output_res(results: Generator[SQLResult], start: float) -> None: max_width, ) self.echo("") - self.output(formatted, status) + self.output(formatted, warning.status) def keepalive_hook(_context): """ @@ -1556,15 +1550,13 @@ def run_query( self.log_query(query) results = self.sqlexecute.run(query) for result in results: - title = result.title - cur = result.results - headers = result.headers self.main_formatter.query = query self.redirect_formatter.query = query output = self.format_output( - title, - cur, - headers, + result.title, + result.results, + result.headers, + result.postamble, special.is_expanded_output(), special.is_redirected(), self.null_string, @@ -1576,16 +1568,14 @@ def run_query( click.echo(line, nl=new_line) # get and display warnings if enabled - if self.show_warnings and isinstance(cur, Cursor) and cur.warning_count > 0: + if self.show_warnings and isinstance(result.results, Cursor) and result.results.warning_count > 0: warnings = self.sqlexecute.run("SHOW WARNINGS") for warning in warnings: - title = warning.title - cur = warning.results - headers = warning.headers output = self.format_output( - title, - cur, - headers, + warning.title, + warning.results, + warning.headers, + warning.postamble, special.is_expanded_output(), special.is_redirected(), self.null_string, @@ -1603,6 +1593,7 @@ def format_output( title: str | None, cur: Cursor | list[tuple] | None, headers: list[str] | str | None, + postamble: str | None, expanded: bool = False, is_redirected: bool = False, null_string: str | None = None, @@ -1633,7 +1624,7 @@ def format_output( # will run before preprocessors defined as part of the format in cli_helpers output_kwargs["preprocessors"] = (preprocessors.convert_to_undecoded_string,) - if title: # Only print the title if it's not None. + if title: output = itertools.chain(output, [title]) if headers or (cur and title): @@ -1684,6 +1675,9 @@ def get_col_type(col) -> type: output = itertools.chain(output, formatted) + if postamble: + output = itertools.chain(output, [postamble]) + return output def get_reserved_space(self) -> int: diff --git a/mycli/packages/special/dbcommands.py b/mycli/packages/special/dbcommands.py index 482807dc..e4b73cb8 100644 --- a/mycli/packages/special/dbcommands.py +++ b/mycli/packages/special/dbcommands.py @@ -173,4 +173,5 @@ def status(cur: Cursor, **_) -> list[SQLResult]: footer.append("\n" + stats_str) footer.append("--------------") - return [SQLResult(title="\n".join(title), results=output, headers="", status="\n".join(footer))] + + return [SQLResult(title="\n".join(title), results=output, headers="", postamble="\n".join(footer))] diff --git a/mycli/packages/sqlresult.py b/mycli/packages/sqlresult.py index 9572ea44..99d1bb1d 100644 --- a/mycli/packages/sqlresult.py +++ b/mycli/packages/sqlresult.py @@ -8,6 +8,7 @@ class SQLResult: title: str | None = None results: Cursor | list[tuple] | None = None headers: list[str] | str | None = None + postamble: str | None = None status: str | None = None command: dict[str, str | float] | None = None @@ -15,4 +16,4 @@ def __iter__(self): return self def __str__(self): - return f"{self.title}, {self.results}, {self.headers}, {self.status}, {self.command}" + return f"{self.title}, {self.results}, {self.headers}, {self.postamble}, {self.status}, {self.command}" diff --git a/test/test_main.py b/test/test_main.py index 5a5b29c6..fc8b3a9b 100644 --- a/test/test_main.py +++ b/test/test_main.py @@ -67,6 +67,7 @@ def test_binary_display_hex(executor, capsys): sqlresult.title, sqlresult.results, sqlresult.headers, + sqlresult.postamble, False, False, "", @@ -106,6 +107,7 @@ def test_binary_display_utf8(executor, capsys): sqlresult.title, sqlresult.results, sqlresult.headers, + sqlresult.postamble, False, False, "", diff --git a/test/test_tabular_output.py b/test/test_tabular_output.py index a4ff3819..f01bd304 100644 --- a/test/test_tabular_output.py +++ b/test/test_tabular_output.py @@ -51,7 +51,7 @@ def description(self): assert list(mycli.change_table_format("sql-update")) == [SQLResult(status="Changed table format to sql-update")] mycli.main_formatter.query = "" mycli.redirect_formatter.query = "" - output = mycli.format_output(None, FakeCursor(), headers, False, False) + output = mycli.format_output(None, FakeCursor(), headers, None, False, False) actual = "\n".join(output) assert actual == dedent("""\ UPDATE `DUAL` SET @@ -67,10 +67,10 @@ def description(self): , `binary` = 0xaabb WHERE `letters` = 'd';""") # Test sql-update-2 output format - assert list(mycli.change_table_format("sql-update-2")) == [SQLResult(None, None, None, "Changed table format to sql-update-2")] + assert list(mycli.change_table_format("sql-update-2")) == [SQLResult(status="Changed table format to sql-update-2")] mycli.main_formatter.query = "" mycli.redirect_formatter.query = "" - output = mycli.format_output(None, FakeCursor(), headers, False, False) + output = mycli.format_output(None, FakeCursor(), headers, None, False, False) assert "\n".join(output) == dedent("""\ UPDATE `DUAL` SET `optional` = NULL @@ -83,36 +83,71 @@ def description(self): , `binary` = 0xaabb WHERE `letters` = 'd' AND `number` = 456;""") # Test sql-insert output format (without table name) - assert list(mycli.change_table_format("sql-insert")) == [SQLResult(None, None, None, "Changed table format to sql-insert")] + assert list(mycli.change_table_format("sql-insert")) == [SQLResult(status="Changed table format to sql-insert")] mycli.main_formatter.query = "" mycli.redirect_formatter.query = "" - output = mycli.format_output(None, FakeCursor(), headers, False, False) + output = mycli.format_output(None, FakeCursor(), headers, None, False, False) assert "\n".join(output) == dedent("""\ INSERT INTO `DUAL` (`letters`, `number`, `optional`, `float`, `binary`) VALUES ('abc', 1, NULL, 10.0e0, 0xaa) , ('d', 456, '1', 0.5e0, 0xaabb) ;""") # Test sql-insert output format (with table name) - assert list(mycli.change_table_format("sql-insert")) == [SQLResult(None, None, None, "Changed table format to sql-insert")] + assert list(mycli.change_table_format("sql-insert")) == [SQLResult(status="Changed table format to sql-insert")] mycli.main_formatter.query = "SELECT * FROM `table`" mycli.redirect_formatter.query = "SELECT * FROM `table`" - output = mycli.format_output(None, FakeCursor(), headers, False, False) + output = mycli.format_output(None, FakeCursor(), headers, None, False, False) assert "\n".join(output) == dedent("""\ INSERT INTO table (`letters`, `number`, `optional`, `float`, `binary`) VALUES ('abc', 1, NULL, 10.0e0, 0xaa) , ('d', 456, '1', 0.5e0, 0xaabb) ;""") # Test sql-insert output format (with database + table name) - assert list(mycli.change_table_format("sql-insert")) == [SQLResult(None, None, None, "Changed table format to sql-insert")] + assert list(mycli.change_table_format("sql-insert")) == [SQLResult(status="Changed table format to sql-insert")] mycli.main_formatter.query = "SELECT * FROM `database`.`table`" mycli.redirect_formatter.query = "SELECT * FROM `database`.`table`" - output = mycli.format_output(None, FakeCursor(), headers, False, False) + output = mycli.format_output(None, FakeCursor(), headers, None, False, False) assert "\n".join(output) == dedent("""\ INSERT INTO database.table (`letters`, `number`, `optional`, `float`, `binary`) VALUES ('abc', 1, NULL, 10.0e0, 0xaa) , ('d', 456, '1', 0.5e0, 0xaabb) ;""") # Test binary output format is a hex string - assert list(mycli.change_table_format("psql")) == [SQLResult(None, None, None, "Changed table format to psql")] - output = mycli.format_output(None, FakeCursor(), headers, False, False) + assert list(mycli.change_table_format("psql")) == [SQLResult(status="Changed table format to psql")] + output = mycli.format_output(None, FakeCursor(), headers, None, False, False) assert '0xaabb' in '\n'.join(output) + + +@dbtest +def test_postamble_output(mycli): + """Test the postamble output property.""" + headers = ['letters', 'number', 'optional', 'float'] + + class FakeCursor: + def __init__(self): + self.data = [('abc', 1, None, 10.0)] + self.description = [ + (None, FIELD_TYPE.VARCHAR), + (None, FIELD_TYPE.LONG), + (None, FIELD_TYPE.LONG), + (None, FIELD_TYPE.FLOAT), + ] + + def __iter__(self): + return self + + def __next__(self): + if self.data: + return self.data.pop(0) + else: + raise StopIteration() + + def description(self): + return self.description + + postamble = 'postamble:\nfooter content' + mycli.change_table_format('ascii') + mycli.main_formatter.query = '' + output = mycli.format_output(None, FakeCursor(), headers, postamble, False, False) + actual = "\n".join(output) + assert actual.endswith(postamble)