What I’m trying do is the following: I have objects for task-offloading and objects connect to each other for task delivery and each has a r_max capacity(beta_switch stands for on/off connection between objects). If the task selects a minimal path for task delivery it increments task_table by one. Also lower_than_r_max represents if the task_table is full or not. After task_table increment, it determines new paths. If I try to express lower_than_r_max(i) in terms of M_Big, it solves(optimally) but cannot show the solved values of task_table array. I have 4 tasks for processing (arrival_times) , task_table is [3,0,0] initially and r_max=7. I expect the program to distribute tasks to 2nd and 3rd objects. Here is my code:
“”"
UAV trajectory and task assignment optimization — CVXPY version.
Converted from MATLAB CVX; all indices use 0-based loops (tested).
“”"
import numpy as np
import cvxpy as cp
import matplotlib.pyplot as plt
from typing import List, Dict, Any, Tuple, Optional, cast
# ---------------------------------------------------------------------------
# Parameters (match MATLAB)
# ---------------------------------------------------------------------------
M = 3
D = 1e8
K = 4
w = np.array([
[305, 5],
[7.5, 315],
[-314, 5],
[5, -313],
])
GREAT_NUMBER = 1e9
P_max = 0.1
rho_0 = 10 ** (-60 / 10)
tau_num = 100
sigma_squ = 10 ** (-110 / 10)
V_max = 50
eps = 1e-4
T = 200
delta_T = 10
N = int(T / delta_T) # 20
S_max = 100
d_min = 30
H = 100
L = 3 # depth
arrival_times = np.array([0.0, 20, 40, 56])
num_tasks = len(arrival_times)
r_max = 5
B = 1e8
M_big = 1e6
tol = 1e-9
# Path-selection big-M constraints can over-constrain; set True to enable (may be infeasible).
USE_PATH_SELECTION_CONSTRAINTS = False
def transfer_time(
M_dim: int,
n: int,
first_uav: int,
second_uav: int,
sigma_squ: float,
rho_0: float,
P: np.ndarray,
H: float,
Q: np.ndarray,
w: np.ndarray,
D: float,
B: float,
) → Tuple[float, float]:
“”"
Compute transfer time and rate between two UAVs at time slot n.
Returns (trans_time, rate_sum). Uses 0-based UAV indices.
"""
rate_sum = 0.0
q_first = Q[first_uav, n, :]
q_second = Q[second_uav, n, :]
dist_sq = np.sum((q_first - q_second) ** 2)
denom = sigma_squ
for j in range(M_dim):
if j != first_uav:
q_j = Q[j, n, :]
dist_j_sq = np.sum((q_first - q_j) ** 2)
denom += P[j, n] * rho_0 / (H**2 + dist_j_sq)
sinr = P[first_uav, n] * rho_0 / (H**2 + dist_sq) / denom
rate_raw_first = np.log2(1 + sinr)
if first_uav == second_uav:
return 0.0, 0.0
trans_time = 1.0 / (rate_raw_first / D * B)
rate_sum = rate_raw_first
return trans_time, rate_sum
def build_propagation_paths(
lower_than_r_max: np.ndarray,
n: int,
dist_matrix: np.ndarray,
r_max_val: int,
L_depth: int,
M_dim: int,
depth: int,
acc_dist: Any,
prev_uav: int,
current_seq: List[int],
) → List[Dict[str, Any]]:
“”"
Recursive helper: build propagation paths. Returns list of dicts with
'total_dist', 'last_uav', 'sequence'. All indices 0-based.
"""
if depth > 1 and lower_than_r_max[prev_uav]:
return [{“total_dist”: acc_dist, “last_uav”: prev_uav, “sequence”: current_seq}]
if depth > L_depth:
return [{“total_dist”: acc_dist, “last_uav”: prev_uav, “sequence”: current_seq}]
path_data = []
for uav_idx in range(M_dim):
if uav_idx == prev_uav:
continue
step_dist = dist_matrix[prev_uav, uav_idx]
if isinstance(acc_dist, (int, float)) and acc_dist == 0:
new_acc = step_dist
else:
new_acc = acc_dist + step_dist
child_paths = build_propagation_paths(
lower_than_r_max,
n,
dist_matrix,
r_max_val,
L_depth,
M_dim,
depth + 1,
new_acc,
uav_idx,
current_seq + [uav_idx],
)
path_data.extend(child_paths)
if not path_data:
return [{“total_dist”: acc_dist, “last_uav”: prev_uav, “sequence”: current_seq}]
return path_data
def build_initial_Q() → np.ndarray:
“”“Initial waypoints for 3 UAVs, shape (3, 21, 2). 0-based.”“”
Q = np.zeros((3, 21, 2))
Q[0, :, :] = [
[300, 10], [240, 60], [180, 120], [120, 180], [60, 240], [10, 300],
[-60, 240], [-120, 180], [-180, 120], [-240, 60], [-300, 10],
[-240, -60], [-180, -120], [-120, -180], [-60, -240], [10, -300],
[60, -240], [120, -180], [180, -120], [240, -60], [300, 10],
]
Q[1, :, :] = [
[0, 10], [0, 60], [0, 120], [0, 180], [0, 240], [10, 307],
[60, 300], [120, 300], [180, 300], [240, 300], [310, 300],
[300, 240], [300, 180], [300, 120], [300, 60], [300, 10],
[240, 0], [180, 0], [120, 0], [60, 0], [0, 10],
]
Q[2, :, :] = [
[0, -10], [0, -60], [0, -120], [0, -180], [0, -240], [10, -307],
[-60, -300], [-120, -300], [-180, -300], [-240, -300], [-310, -300],
[-300, -240], [-300, -180], [-300, -120], [-300, -60], [-300, -10],
[-240, 0], [-180, 0], [-120, 0], [-60, 0], [0, -10],
]
return Q
def build_initial_P() → np.ndarray:
“”“P shape (M, N) with P_max.”“”
return np.full((M, N), P_max)
def run_one_iteration(Q: np.ndarray, P: np.ndarray) → Tuple[Optional[float], Optional[dict]]:
“”"
Run one CVXPY optimization iteration. Returns (objective_value, solution_dict or None).
solution_dict includes: add_table, beta_switch, max_dist_ind, lower_than_r_max, paths.
"""
alpha_switch = cp.Variable((M, N, K), name=“alpha_switch”)
beta_switch = cp.Variable((N, M, M), name=“beta_switch”, nonneg=True)
add_table = cp.Variable(M, boolean=True, name=“add_table”)
max_dist_ind = cp.Variable((N, num_tasks), name=“max_dist_ind”, nonneg=True)
task_table_arr = cp.Variable((N, num_tasks, M), name=“task_table_arr”)
task_table = cp.Variable((M), integer=True, name=“task_table”)
task_table.value = np.array([3, 0, 0])
trans_time_nij = np.zeros((N, M, M))
add_table_arr = cp.Variable((N,num_tasks,M), boolean=True, name=“add_table_arr”)
#lower_than_r_max = cp.Variable((M), boolean=True, name=“lower_than_r_max”)
lower_than_r_max = np.array([True, True, True])
for n in range(N):
for i in range(M):
for j in range(M):
trans_time_nij[n, i, j], _ = transfer_time(
M, n, i, j, sigma_squ, rho_0, P, H, Q, w, D, B
)
dist_matrix_expr = {}
for n in range(N):
for i in range(M):
for j in range(M):
t = trans_time_nij[n, i, j]
dist_matrix_expr[(n, i, j)] = t * (
1 + GREAT_NUMBER * (1 - beta_switch[n, i, j])
)
r_max_table = np.array([r_max] * M)
pathlist_per_nk: List[List[List[Dict]]] = []
num_paths_nk = np.zeros((N, num_tasks), dtype=int)
max_dist_p_expr: List[List[cp.Expression]] = [
[cp.Constant(0.0) for _ in range(num_tasks)] for _ in range(N)
]
# Symbolic running task_table (expression) and constraints list.
constraints: List[Any] = []
for n in range(N):
pathlist_per_nk.append([])
dist_n = np.zeros((M, M), dtype=object)
for i in range(M):
for j in range(M):
dist_n[i, j] = dist_matrix_expr[(n, i, j)]
for k in range(num_tasks):
paths = []
start_uav = 0
paths_from_start = build_propagation_paths(
lower_than_r_max,
n,
dist_n,
r_max,
L,
M,
depth=1,
acc_dist=0.0,
prev_uav=start_uav,
current_seq=[start_uav],
)
paths.extend(paths_from_start)
pathlist_per_nk[n].append(paths)
num_paths_nk[n, k] = len(paths)
if len(paths) == 0:
max_dist_p_expr[n][k] = cp.Constant(0.0)
else:
path_dists = [p[“total_dist”] for p in paths]
max_dist_p_expr[n][k] = (
path_dists[0] if len(path_dists) == 1 else cp.maximum(*path_dists)
)
task_table = task_table + add_table
#lower_than_r_max = (r_max_table - 1) >= task_table
# Link CVXPY Variable task_table_arr to the symbolic task_table expression
constraints.append(add_table_arr[n, k,:] == add_table[:])
constraints.append(task_table_arr[n, k, :] == task_table[:])
paths_summary: List[List[List[Dict[str, Any]]]] = [
\[{"sequence": p\["sequence"\], "last_uav": p\["last_uav"\]} *for* p *in* paths_k\]
*for* paths_k *in* paths_n
for paths_n in pathlist_per_nk
]
total_sum_time_n = 0.0
for n in range(N):
for k in range(num_tasks):
total_sum_time_n += max_dist_ind[n, k]
constraints = []
for n in range(N):
for k in range(num_tasks):
mp = max_dist_p_expr[n][k]
constraints.append(max_dist_ind[n, k] >= mp)
constraints.append(cp.sum(add_table) == 1)
#task_table_final = task_table_init + num_tasks * add_table
for i in range(M):
constraints.append(
task_table[i] <= r_max_table[i]
)
constraints.append(r_max_table[i] >= task_table[i] + 1e-5 - M_big * (1 - lower_than_r_max[i]))
constraints.append(r_max_table[i] <= task_table[i] + M_big * lower_than_r_max[i])
if USE_PATH_SELECTION_CONSTRAINTS:
for n in range(N):
paths_n = pathlist_per_nk[n]
for k in range(num_tasks):
paths = paths_n[k]
if len(paths) == 0:
continue
path_dists = [p[“total_dist”] for p in paths]
for p_idx, path in enumerate(paths):
L_uav = path[“last_uav”]
d_p = path_dists[p_idx]
for q_idx, _ in enumerate(paths):
if p_idx == q_idx:
continue
d_q = path_dists[q_idx]
constraints.append(
d_p - d_q <= M_big * (1 - add_table[L_uav])
)
for n in range(N):
for i in range(M):
constraints.append(cp.sum(beta_switch[n, i, :]) <= 2)
constraints.append(beta_switch[n, i, i] == 0)
for j in range(M):
constraints.append(beta_switch[n, i, j] <= 1)
constraints.append(beta_switch[n, i, j] >= 0)
prob = cp.Problem(cp.Minimize(total_sum_time_n), constraints)
try:
prob.solve(verbose=True)
if prob.status in (“optimal”, “optimal_inaccurate”):
assert prob.value is not None
obj_val = float(cast(float, prob.value))
add_val = np.array(add_table.value).flatten()
ltr_val = np.array(lower_than_r_max).flatten()
task_table_arr_val = np.array(task_table_arr.value)
add_table_arr_val = np.array(add_table_arr.value)
return obj_val, {
“add_table”: add_val,
“beta_switch”: beta_switch.value,
“max_dist_ind”: max_dist_ind.value,
“lower_than_r_max”: ltr_val,
“paths”: paths_summary,
“task_table_arr”: task_table_arr_val,
“add_table_arr”: add_table_arr_val,
}
except Exception as e:
print(“Solve failed:”, e)
return None, None
def main() → None:
Q = build_initial_Q()
P = build_initial_P()
plt.figure(figsize=(8, 8))
plt.plot(Q[0, :, 0], Q[0, :, 1], “-*”, label=“UAV-1”)
plt.plot(Q[1, :, 0], Q[1, :, 1], “-s”, label=“UAV-2”)
plt.plot(Q[2, :, 0], Q[2, :, 1], “-+”, label=“UAV-3”)
plt.xlabel(“x”)
plt.ylabel(“y”)
plt.title(“Double-UAV Initial Trajectory”)
plt.legend()
plt.grid(True)
plt.gcf().set_size_inches(8, 8)
plt.tight_layout()
plt.savefig(“uav_initial_trajectory.png”, dpi=150)
plt.close()
print(“Saved uav_initial_trajectory.png”)
val, sol = run_one_iteration(Q, P)
if val is not None and sol is not None:
print(“Objective:”, val)
add = np.array(sol[“add_table”]).reshape(-1)
ltr = np.array(sol[“lower_than_r_max”]).reshape(-1)
task_table_arr = np.array(sol[“task_table_arr”])
add_table_arr = np.array(sol[“add_table_arr”])
paths = sol[“paths”]
print(“add_table (solution):”, add)
print(“lower_than_r_max (solution):”, ltr)
print(“Per-task values (k: lower_than_r_max, add_table, task_table):”)
for n in range(N):
for k in range(num_tasks):
print(f" task {k}: task_table={task_table_arr[n,k,:]}")
print(f" task {k}: add_table_arr={add_table_arr[n,k,:]}")
else:
print(“Optimization did not solve to optimal.”)
if _name_ == “_main_”:
main()