Skip to content

Fix use with Sequoia Chameleon #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Dec 21, 2024
Merged
Prev Previous commit
Use unrestricted connection for HAVEKEY --list and KEYINFO --list
These commands are forbidden over a restricted connection to the agent,
but GnuPG wars if they are not present and Sequoia Chameleon requires
them.  Fortunately, they are trivial to sanitize input for, so there is
near-zero risk of an injection vulnerability.  Therefore, use a separate
unrestricted agent connection for these commands.  Also use a separate
function to read agent hello messages sent upon connection.
  • Loading branch information
DemiMarie committed Dec 19, 2024
commit 2129a1287494148e9839e0219e9d191cb6308410
106 changes: 73 additions & 33 deletions splitgpg2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@
commands: Dict[bytes, 'NoneCallback']
seen_data: bool
config_loaded: bool
agent_unrestricted_socket_path: Optional[str]
agent_socket_path: Optional[str]
agent_reader: Optional[asyncio.StreamReader]
agent_writer: Optional[asyncio.StreamWriter]
Expand Down Expand Up @@ -245,6 +246,7 @@
'seen_data',
'config_loaded',
'agent_socket_path',
'agent_unrestricted_socket_path',
'agent_reader',
'agent_writer',
'source_keyring_dir',
Expand Down Expand Up @@ -275,6 +277,7 @@

self.log = logging.getLogger('splitgpg2.Server')
self.agent_socket_path = None
self.agent_unrestricted_socket_path = None
self.agent_reader: Optional[asyncio.StreamReader] = None
self.agent_writer: Optional[asyncio.StreamWriter] = None

Expand Down Expand Up @@ -456,14 +459,20 @@

dirs = subprocess.check_output(
['gpgconf', *self.homedir_opts(), '--list-dirs', '-o/dev/stdout'])
if self.allow_keygen:
socket_field = b'agent-socket:'
else:
socket_field = b'agent-extra-socket'
unrestricted_socket_field = b'agent-socket'
socket_field = unrestricted_socket_field if self.allow_keygen else b'agent-extra-socket'
# search for agent-socket:/run/user/1000/gnupg/S.gpg-agent
agent_socket_path = [d.split(b':', 1)[1] for d in dirs.splitlines()
if d.startswith(socket_field)][0]
self.agent_socket_path = agent_socket_path.decode()
for d in dirs.splitlines():
key, value = d.split(b':')
if key == socket_field:
self.agent_socket_path = value.decode("UTF-8", "surrogateescape")
if key == unrestricted_socket_field:
self.agent_unrestricted_socket_path = value.decode("UTF-8", "surrogateescape")
if ((self.agent_unrestricted_socket_path is not None) and
(self.agent_socket_path is not None)):
break
else:
raise RuntimeError("bad output from gpgconf")

Check warning on line 475 in splitgpg2/__init__.py

View check run for this annotation

Codecov / codecov/patch

splitgpg2/__init__.py#L475

Added line #L475 was not covered by tests

self.agent_reader, self.agent_writer = await asyncio.open_unix_connection(
path=self.agent_socket_path)
Expand All @@ -472,7 +481,7 @@
self.notify('connected')

# wait for agent hello
await self.handle_agent_response({})
self.client_write(await self.read_hello(self.agent_reader))

def close(self, reason: str, log_level: int = logging.ERROR) -> None:
self.log.log(log_level, '%s; Closing!', reason)
Expand Down Expand Up @@ -559,8 +568,8 @@
b'lc-messages': (OptionHandlingType.fake, b'OK'),
b'putenv': (OptionHandlingType.fake, b'OK'),
b'pinentry-mode': (OptionHandlingType.fake, b'ERR 67108924 Not supported <GPG Agent>'),
b'allow-pinentry-notify': (OptionHandlingType.verify, None),
b'agent-awareness': (OptionHandlingType.verify, b'2.1.0')
b'allow-pinentry-notify': (OptionHandlingType.fake, b'OK'),
b'agent-awareness': (OptionHandlingType.verify, b'2.1.0'),
}

@staticmethod
Expand Down Expand Up @@ -649,17 +658,17 @@
raise Filtered
if allow_list and untrusted_args.startswith(b'--list'):
if untrusted_args == b'--list':
return b'--list'

Check warning on line 661 in splitgpg2/__init__.py

View check run for this annotation

Codecov / codecov/patch

splitgpg2/__init__.py#L661

Added line #L661 was not covered by tests
if untrusted_args[6] == 61: # ASCII '='
# 1000 is the default value used by gpg2
return b'--list=%d' % sanitize_int(untrusted_args[7:], 1, 1000)
raise Filtered

Check warning on line 665 in splitgpg2/__init__.py

View check run for this annotation

Codecov / codecov/patch

splitgpg2/__init__.py#L665

Added line #L665 was not covered by tests
untrusted_args_list: List[bytes] = untrusted_args.split(b' ')
if not (min_count <= len(untrusted_args_list) <= max_count):
raise Filtered
for untrusted_arg in untrusted_args_list:
if len(untrusted_arg) != 40 or not _hash_regex.match(untrusted_arg):
raise Filtered

Check warning on line 671 in splitgpg2/__init__.py

View check run for this annotation

Codecov / codecov/patch

splitgpg2/__init__.py#L671

Added line #L671 was not covered by tests
return b' '.join(untrusted_args_list)

def sanitize_key_desc(self, untrusted_args: bytes) -> bytes:
Expand Down Expand Up @@ -692,8 +701,8 @@

if untrusted_args == b'pinentry-mode=ask':
# This is the default and a no-op
self.fake_respond(b'OK')
return

Check warning on line 705 in splitgpg2/__init__.py

View check run for this annotation

Codecov / codecov/patch

splitgpg2/__init__.py#L704-L705

Added lines #L704 - L705 were not covered by tests

untrusted_name, untrusted_value = extract_args(untrusted_args, b'=')
try:
Expand Down Expand Up @@ -738,11 +747,13 @@
raise Filtered
# upper keygrip limit is arbitary
args = self.verify_keygrip_arguments(1, 200, untrusted_args, True)
await self.send_agent_command(b'HAVEKEY', args)
unrestricted = args.startswith(b'--list') and not self.allow_keygen
await self.send_agent_command(b'HAVEKEY', args, unrestricted)

async def command_KEYINFO(self, untrusted_args: Optional[bytes]) -> None:
args = self.verify_keygrip_arguments(1, 1, untrusted_args, True)
await self.send_agent_command(b'KEYINFO', args)
unrestricted = args.startswith(b'--list') and not self.allow_keygen
await self.send_agent_command(b'KEYINFO', args, unrestricted)

async def command_GENKEY(self, untrusted_args: Optional[bytes]) -> None:
if not self.allow_keygen:
Expand Down Expand Up @@ -817,7 +828,8 @@
key.fingerprint,
subkey_desc)

self.agent_write(b'SETKEYDESC %s\n' % self.percent_plus_escape(desc))
assert self.agent_writer is not None, "no writer?"
self.agent_write(b'SETKEYDESC %s\n' % self.percent_plus_escape(desc), self.agent_writer)

assert self.agent_reader is not None
untrusted_line = await self.agent_reader.readline()
Expand Down Expand Up @@ -916,7 +928,7 @@
async def command_NOP(self, untrusted_args: Optional[bytes]) -> None:
# Ignores all arguments.
# pylint: disable=unused-argument
self.fake_respond(b'OK')

Check warning on line 931 in splitgpg2/__init__.py

View check run for this annotation

Codecov / codecov/patch

splitgpg2/__init__.py#L931

Added line #L931 was not covered by tests

async def command_PKDECRYPT(self, untrusted_args: Optional[bytes]) -> None:
if untrusted_args is not None:
Expand Down Expand Up @@ -953,7 +965,7 @@
if untrusted_args is not None:
if not untrusted_args.startswith(b'-- '):
raise Filtered
if self.cache_nonce_regex.match(untrusted_args[3:]) is None:

Check warning on line 968 in splitgpg2/__init__.py

View check run for this annotation

Codecov / codecov/patch

splitgpg2/__init__.py#L968

Added line #L968 was not covered by tests
raise Filtered
args = untrusted_args

Expand Down Expand Up @@ -1014,48 +1026,73 @@
}
return {}

async def send_agent_command(self, command: bytes, args: Optional[bytes]) -> None:
async def send_agent_command(self, command: bytes, args: Optional[bytes],
unrestricted: bool=False) -> None:
""" Sends command to local gpg agent and handle the response """
expected_inquires = self.get_inquires_for_command(command)
if args:
if not self.command_argument_regex.match(args):
raise AssertionError("BUG: corrupt command about to be sent to agent!")
cmd_with_args = command + b' ' + args + b'\n'
assert self.agent_reader is not None, "no reader?"
assert self.agent_writer is not None, "no writer?"
if unrestricted and not self.allow_keygen:
reader, writer = await asyncio.open_unix_connection(
self.agent_unrestricted_socket_path)
await self.read_hello(reader)
else:
cmd_with_args = command + b'\n'
self.agent_write(cmd_with_args)
while True:
more_expected = await self.handle_agent_response(
expected_inquires=expected_inquires)
if not more_expected:
break
reader, writer = self.agent_reader, self.agent_writer
try:
if args:
if not self.command_argument_regex.match(args):
raise AssertionError("BUG: corrupt command about to be sent to agent!")

Check warning on line 1044 in splitgpg2/__init__.py

View check run for this annotation

Codecov / codecov/patch

splitgpg2/__init__.py#L1044

Added line #L1044 was not covered by tests
cmd_with_args = command + b' ' + args + b'\n'
else:
cmd_with_args = command + b'\n'
self.agent_write(cmd_with_args, writer)
while True:
more_expected = await self.handle_agent_response(expected_inquires, reader)
if not more_expected:
break
finally:
if reader is not self.agent_reader:
writer.close()

def agent_write(self, data: bytes) -> None:
writer = self.agent_writer
async def read_hello(self, agent_reader: asyncio.StreamReader) -> bytes:
while True:
line = await agent_reader.readline()
if not line.endswith(b'\n'):
raise ProtocolError("premature EOF from agent connection")

Check warning on line 1061 in splitgpg2/__init__.py

View check run for this annotation

Codecov / codecov/patch

splitgpg2/__init__.py#L1061

Added line #L1061 was not covered by tests
if b'\n' in line[:-1]:
raise ProtocolError("newline in readline() result???")

Check warning on line 1063 in splitgpg2/__init__.py

View check run for this annotation

Codecov / codecov/patch

splitgpg2/__init__.py#L1063

Added line #L1063 was not covered by tests
if line.startswith(b'#'):
continue

Check warning on line 1065 in splitgpg2/__init__.py

View check run for this annotation

Codecov / codecov/patch

splitgpg2/__init__.py#L1065

Added line #L1065 was not covered by tests
if line == b'OK' or line.startswith(b'OK '):
return line
raise ProtocolError("agent responded with something other than 'OK'"

Check warning on line 1068 in splitgpg2/__init__.py

View check run for this annotation

Codecov / codecov/patch

splitgpg2/__init__.py#L1068

Added line #L1068 was not covered by tests
" to initial connection")

def agent_write(self, data: bytes, writer: asyncio.StreamWriter) -> None:
assert writer is not None, 'agent_write called with no agent writer?'
self.log_io('A <<<', data)
writer.write(data)

async def handle_agent_response(self,
expected_inquires: Dict[bytes, 'ArgCallback']) -> bool:
expected_inquires: Dict[bytes, 'ArgCallback'],
agent_reader: asyncio.StreamReader) -> bool:
""" Receive and handle one agent response. Return whether there are
more expected """
assert self.agent_reader is not None
assert self.client_writer is not None
if self.client_writer.is_closing():
# If something went wrong, agent might send back junk.
# Discard all remaining data from agent and return.
while await self.agent_reader.read(1024):
while await agent_reader.read(1024):
pass
return False
# We generally consider the agent as trusted. But since the client can
# determine part of the response we handle this here as untrusted.
untrusted_line = await self.agent_reader.readline()
untrusted_line = await agent_reader.readline()
untrusted_line = untrusted_line.rstrip(b'\n')
self.log_io('A >>>', untrusted_line)
if untrusted_line.startswith(b'#'):
# Comment, ignore
return True

Check warning on line 1095 in splitgpg2/__init__.py

View check run for this annotation

Codecov / codecov/patch

splitgpg2/__init__.py#L1095

Added line #L1095 was not covered by tests
untrusted_res, untrusted_args = extract_args(untrusted_line)
if untrusted_res in (b'D', b'S'):
# passthrough to the client
Expand Down Expand Up @@ -1291,7 +1328,9 @@
raise Filtered from e
args = untrusted_sexp

self.agent_write(b'D ' + self.escape_D(self.serialize_sexpr(args)) + b'\n')
assert self.agent_writer is not None, "no writer?"
self.agent_write(b'D ' + self.escape_D(self.serialize_sexpr(args)) + b'\n',
self.agent_writer)
self.seen_data = True
return True

Expand Down Expand Up @@ -1389,7 +1428,8 @@
async def inquire_command_END(self, *, untrusted_args: bytes) -> bool:
if untrusted_args:
raise Filtered('unexpected arguments to END')
self.agent_write(b'END\n')
assert self.agent_writer is not None, "no writer?"
self.agent_write(b'END\n', self.agent_writer)
return False

# endregion
Expand Down