import requests
import argparse
# This utility file is used to test chunk upload functionality.
# Usage:
# Full Scan
# python api/tests/test_upload_in_chunks.py --branch branchX --repo_url https://github.com/repo123 --sha 21jio112j3 --project_name projectX /path/to/your/file.zip
# Partial Scan:
# python api/tests/test_upload_in_chunks.py --branch branchX --repo_url https://github.com/repo123 --sha 21jio112j3 --project_name projectX --files_to_scan "vuln.py, test.py" --partial_scan true /path/to/your/file.zip
# Configuration
API_BASE_URL = "https://www.corgea.app/api/v1/start-scan"
url_base_params = {"scan_type": "blast"}
token_headers = {"CORGEA-TOKEN": "<YOUR_TOKEN>"}
LARGE_CHUNK_SIZE = 5 * 1024 * 1024 # 5 MB chunks
SMALL_CHUNK_SIZE = int(0.01 * 1024 * 1024) # 10 KB is approximately 0.01 MB
CHUNK_SIZE = LARGE_CHUNK_SIZE
def initiate_upload(file_path):
"""Initiate the chunked upload by sending file metadata and a blank chunk."""
file_name = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
metadata = {
'file_name': file_name,
'file_size': file_size
}
print(f"Sending metadata for {file_name}:")
blank_chunk = b''
files = {'files': (file_name, blank_chunk)}
response = requests.post(API_BASE_URL, params=url_base_params, files=files, headers=token_headers, json=metadata)
if response.status_code == 200:
upload_response = response.json()
transfer_id = upload_response.get("transfer_id")
print(f"Upload initiated. Transfer ID: {transfer_id}. Message: {upload_response.get('message')}")
return transfer_id
else:
print(f"Failed to initiate upload: {response.text}")
return None
def upload_chunk(file_path, transfer_id, chunk_offset, chunk_data, project_name, branch, repo_url, sha, partial_scan, files_to_scan):
"""Upload a single chunk."""
headers = {
'Upload-Offset': str(chunk_offset),
'Upload-Length': str(os.path.getsize(file_path)),
'Upload-Name': os.path.basename(file_path),
}
print(f"headers: {headers}")
headers.update(token_headers)
form_data = {
'project_name': project_name,
'branch': branch,
'repo_url': repo_url,
'sha': sha,
'partial_scan': partial_scan,
'files_to_scan': files_to_scan
}
print(f"form_data: {form_data}")
files = {
'chunk_data': ('chunk', chunk_data, 'application/octet-stream'),
}
response = requests.patch(
f"{API_BASE_URL}/{transfer_id}/",
headers=headers,
params=url_base_params,
files=files,
data=form_data,
)
if response.status_code == 200:
print(f"Upload progress: {chunk_offset / os.path.getsize(file_path) * 100:.2f}% ({chunk_offset})")
return True, response.json(), response.headers
else:
print(f"Failed to upload chunk at offset {chunk_offset}: {response.text}")
return False, response.json(), response.headers
def check_upload_status(transfer_id):
"""Check the status of the current upload."""
response = requests.head(f"{API_BASE_URL}/{transfer_id}/", params=url_base_params, headers=token_headers)
if response.status_code == 200:
offset = response.headers.get('Upload-Offset')
print(f"Current upload offset from response header 'Upload-Offset': {offset}")
return int(offset)
else:
print(f"Failed to check upload status: {response.text}")
return None
def upload_file_in_chunks(file_path, project_name, branch, repo_url, sha, partial_scan=False, files_to_scan=[]):
"""Upload a file in chunks."""
file_size = os.path.getsize(file_path)
transfer_id = initiate_upload(file_path)
if not transfer_id:
return
offset = check_upload_status(transfer_id) or 0
remote_offset = 0
print(f"Uploading file in chunks. Total size: {file_size}")
with open(file_path, 'rb') as f:
f.seek(offset)
while offset < file_size and remote_offset < file_size:
chunk_data = f.read(CHUNK_SIZE)
success, response, response_headers = upload_chunk(file_path, transfer_id, offset, chunk_data, project_name, branch, repo_url, sha, partial_scan, files_to_scan)
if not success:
print("Aborting upload due to an error.")
return
offset += len(chunk_data)
print(f"Expected to have written {offset}")
if response_headers.get('Upload-Offset'):
remote_offset = int(response_headers.get('Upload-Offset', offset))
print(f"Upload progress: 100% ({file_size}) {remote_offset}")
print(f"File {file_path} uploaded successfully!")
print(f"Scan ID: {response.get('scan_id')}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Upload a large file in chunks.")
parser.add_argument(
"file",
type=str,
help="Path to the file to be uploaded."
)
parser.add_argument(
"--branch",
type=str,
required=False,
help="Branch of the repository."
)
parser.add_argument(
"--repo_url",
type=str,
required=False,
help="URL of the repository."
)
parser.add_argument(
"--sha",
type=str,
required=False,
help="SHA of the project."
)
parser.add_argument(
"--project_name",
type=str,
required=False,
help="Name of the project."
)
parser.add_argument(
"--partial_scan",
type=bool,
required=False,
help="True if this is a partial scan."
)
parser.add_argument(
"--files_to_scan",
type=str,
required=False,
help="Comma-separated files to scan."
)
args = parser.parse_args()
file_to_upload = args.file
if not os.path.isfile(file_to_upload):
print(f"Error: File '{file_to_upload}' does not exist.")
else:
print("Parameters received:")
print(f"File to upload: {file_to_upload}")
print(f"Project name: {args.project_name}")
print(f"Branch: {args.branch}")
print(f"Repo URL: {args.repo_url}")
print(f"Files to scan: {args.files_to_scan}")
print(f"Partial scan: {args.partial_scan}")
upload_file_in_chunks(file_to_upload, args.project_name, args.branch, args.repo_url, args.sha, args.partial_scan, args.files_to_scan)