#!/usr/bin/python3 # -*- encoding: Utf-8 -*- from pathlib import Path from google.protobuf.descriptor_pb2 import FileDescriptorProto from google.protobuf.internal.decoder import _DecodeVarint from extract.descpb_to_proto import descpb_to_proto def walk_binary(binr): with open(binr, "rb") as fd: binr = fd.read() # Search for: # ".proto" or ".protodevel", as part of the "name" (1) field cursor = 0 while cursor < len(binr): cursor = binr.find(b".proto", cursor) if cursor == -1: break cursor += len(".proto") cursor += (binr[cursor : cursor + 5] == b"devel") * 5 # Search back for the (1, length-delimited) marker start = binr.rfind(b"\x0a", max(cursor - 1024, 0), cursor) if start > 0 and binr[start - 1] == 0x0A == (cursor - start - 1): start -= 1 # Check whether length byte is coherent if start == -1: continue varint, end = _DecodeVarint(binr, start + 1) if cursor - end != varint: continue # Look just after for subsequent markers tags = b"\x12\x1a\x22\x2a\x32\x3a\x42\x4a\x50\x58\x62" if binr[cursor] not in tags: continue while cursor < len(binr) and binr[cursor] in tags: tags = tags[tags.index(binr[cursor]) :] varint, end = _DecodeVarint(binr, cursor + 1) cursor = end + varint * (binr[cursor] & 0b111 == 2) # Parse descriptor proto = FileDescriptorProto() proto.ParseFromString(binr[start:cursor]) # Convert to ascii yield descpb_to_proto(proto) if __name__ == "__main__": base = Path(__file__).resolve().parent proto_base = base / "proto" proto_base.mkdir(exist_ok=True) for name, proto in walk_binary("./mgt"): if ( name.startswith("google") or name.startswith("grpc") or name.startswith("github.com/golang/protobuf") ): continue p = proto_base / name p.parent.mkdir(parents=True, exist_ok=True) print(p) with open(p, "w") as fd: fd.write(proto)