from __future__ import annotations
from collections import namedtuple
from copy import deepcopy
import numpy as np
from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister
from qiskit.circuit import CircuitInstruction, Instruction, Qubit
from qiskit.converters import circuit_to_dag, dag_to_circuit
from qiskit_aer import AerSimulator
from QCut.backend_utility import transpile_subcircuits
from QCut.cutlocation import CutLocation, SingleQubitCutLocation
from QCut.qcuterror import QCutError
from QCut.QCutFind import construct_final_subcircuits
from QCut.wirecut import (
estimate_expectation_values,
get_experiment_circuits,
run_experiments,
)
BitLocations = namedtuple("BitLocations", ("index", "registers"))
def _get_cut_locations(circuit):
index = 0 # index of the current instruction in circuit_data
circuit_data = circuit.data
cut_locations = np.array([])
# loop through circuit instructions
# if operation is a Cut() instruction remove it and add registers and
# offset index to cut_locations
# rename varibales to be more descriptive (namely qs)
while index < len(circuit):
op = circuit_data[index]
if "Cut" in op.operation.name:
# find qubits for Cut operation
qubits = [
circuit.find_bit(qubit).registers[0]
for qubit in op.qubits
]
# remove the cut operation
circuit_data.remove(op)
# append to cut_locations
if len(qubits) == 1:
cut_locations = np.append(
cut_locations, SingleQubitCutLocation((qubits[0], index))
)
elif len(qubits) == 2:
cut_locations = np.append(
cut_locations, CutLocation((qubits, index))
)
else:
raise QCutError("Cannot cut gates with more that 2 qubits." \
"Transpile circuit to only contain 2 qubit gates.")
# adjust index to account for removed operation
index -= 1
index += 1
return cut_locations
class NonCommutingGate(Instruction):
def __init__(self, name="Init_1"):
super().__init__(name=name, num_qubits=1, num_clbits=0, params=[])
self._opaque = True
def __repr__(self):
return f"{self.name}"
def _insert_cut_nodes(circuit, cut_locations):
circuit_data = circuit.data
cut_index = 0
offset = 0
for cut_location in cut_locations:
measure_node = NonCommutingGate(f"Meas_{cut_index}")
initialize_node = NonCommutingGate(f"Init_{cut_index}")
cut_czc = NonCommutingGate(f"cutCZ_c_{cut_index}")
cut_czt = NonCommutingGate(f"cutCZ_t_{cut_index}")
cut_index += 1
cur_ops = (measure_node, initialize_node) if isinstance(cut_location,
SingleQubitCutLocation) else (cut_czc, cut_czt)
if isinstance(cut_location, SingleQubitCutLocation):
for ph_op in cur_ops:
circuit_data.insert(
cut_location.index + offset,
CircuitInstruction(
operation=ph_op,
qubits=[Qubit(cut_location.qubits[0], cut_location.qubits[1])],
),
)
offset += 1
else:
circuit_data.insert(
cut_location.index + offset,
CircuitInstruction(
operation=cur_ops[0],
qubits=[Qubit(cut_location.qubits[0][0],
cut_location.qubits[0][1])],
),
)
offset += 1
circuit_data.insert(
cut_location.index + offset,
CircuitInstruction(
operation=cur_ops[1],
qubits=[Qubit(cut_location.qubits[1][0],
cut_location.qubits[1][1])],
),
)
offset += 1
return circuit
def _move_to_new_wire(orig: QuantumCircuit) -> QuantumCircuit:
# Create the new circuit and add registers
offset = 0
new = QuantumCircuit(name=orig.name + "_rebuilt")
for creg in orig.cregs:
new.add_register(ClassicalRegister(creg.size, creg.name))
for qreg in orig.qregs:
new.add_register(QuantumRegister(0, qreg.name))
# Create fresh Qubit objects for each original wire
# and record a mapping old_qubit -> new_qubit
qubit_map = {}
new_qubits = []
for idx, q_old in enumerate(orig.qubits):
q_new = Qubit()
new_qubits.append(q_new)
qubit_map[q_old] = q_new
new.add_bits(new_qubits)
# 3) Replay every instruction, splitting on Measure
for inst, qargs, cargs in orig.data:
# map every qarg via our current mapping
mapped_qs = [qubit_map[q] for q in qargs]
if inst.name.startswith("Meas"):
# append this measurement on the current wire
new.append(inst, mapped_qs, cargs)
# now allocate a fresh wire for all future uses of qargs[0]
q_fresh = Qubit()
new.add_bits([q_fresh])
new.qubits.remove(q_fresh)
new.qubits.insert(orig.find_bit(qargs[0]).index+1+offset, q_fresh)
offset += 1
# update the mapping so q_old -> q_fresh going forward
qubit_map[qargs[0]] = q_fresh
else:
# just copy the gate over to mapped_qs
new.append(inst, mapped_qs, cargs)
return new
def count_gates(qc: QuantumCircuit):
gate_count = dict.fromkeys(qc.qubits, 0)
for gate in qc.data:
for qubit in gate.qubits:
gate_count[qubit] += 1
return gate_count
def _remove_idle_wires(qc: QuantumCircuit):
qc_out = deepcopy(qc)
gate_count = count_gates(qc_out)
for qubit, count in gate_count.items():
if count == 0:
qc_out.qubits.remove(qubit)
for i in qc_out.qregs:
if qubit in i._bits:
i._bits.remove(qubit)
qc_out.qregs[0]._bit_indices = {
qubit: qc_out.qubits.index(qubit) for qubit in qc_out.qubits
}
qc_out.qregs[0]._bits = qc_out.qubits
qc_out.qregs[0]._size = len(qc_out.qregs[0]._bits)
return qc_out
def _separate_subcircuits(circuit):
dag = circuit_to_dag(circuit)
circs = dag.separable_circuits()
new_circs = []
for i in circs:
circ = _remove_idle_wires(dag_to_circuit(i))
if len(circ.qubits) == 0:
continue
new_circs.append(circ)
return new_circs
def _add_cbits(subcircuits):
for circ in subcircuits:
clbits = 0
clbits_qpd = 0
for i in circ:
name = i.operation.name
if "Meas" in name:
clbits_qpd += 1
elif "cut" in name:
clbits += 1
clbits_qpd += 1
circ.add_register(ClassicalRegister(clbits_qpd, "qpd_meas"))
circ.add_register(ClassicalRegister(circ.num_qubits - clbits_qpd
+ clbits, "meas"))
return subcircuits
def get_qubit_map(subcircuits: list[QuantumCircuit]):
def filter_obs_i(qc_data):
return [i for i in qc_data if "obs" in i.operation.name]
def sort_func(obs):
return int(obs.operation.name.split("_")[1])
map_qubit = {}
count = 0
for ind, i in enumerate(reversed(subcircuits)):
for j in sorted(filter_obs_i(i.data), key=sort_func, reverse=True):
map_qubit[int(j.operation.name.split("_")[1])] = count
count += 1
return map_qubit
[docs]
def get_locations_and_subcircuits(
circuit: QuantumCircuit,
max_qubits: list[int] | None = None,
):
"""Get cut locations and subcircuits with placeholder operations.
Args:
circuit (QuantumCircuit): circuit with cuts inserted
max_qubits (list[int], optional):
list of maximum qubits per subcircuit when using automatic cut
finding. If None, no constraint is used. Defaults to None.
In general it is not necesary to manually specify this parameter.
Returns:
tuple: A tuple containing:
- list[SingleQubitCutLocation]: Locations of the cuts as a list
- list[QuantumCircuit]: Subcircuits with placeholder operations
- dict[int:int]: map of subcircuit qubit indices to original circuit
qubit indices
"""
circuit = circuit.copy() # copy to avoid modifying the original circuit
circuit = circuit.decompose(["CutGate"])
for i in range(circuit.num_qubits):
obs_m = QuantumCircuit(1, name=f"obs_{i}")
obs_m = obs_m.to_instruction()
circuit.append(obs_m, [i])
cut_locations = _get_cut_locations(circuit)
circuit1 = _insert_cut_nodes(circuit, cut_locations)
circuit = _move_to_new_wire(circuit1.copy())
subcircuits = _separate_subcircuits(circuit)
subcircuits = _add_cbits(subcircuits)
fixed_circs = []
for i in subcircuits:
test = QuantumCircuit(i.num_qubits)
test.add_register(i.cregs[0])
test.add_register(i.cregs[1])
for j in i.data:
qubits = [test.qubits[i.qubits.index(q)] for q in j.qubits]
test.append(CircuitInstruction(j.operation, qubits))
fixed_circs.append(test)
if len(fixed_circs) <= 1:
raise QCutError(
"Invalid cuts. Check documentation to see how cuts should be placed."
)
if max_qubits and len(fixed_circs) != len(max_qubits):
"""if max_qubits is None:
raise QCutError(
"max_qubits must be specified when automatic cut finding with " \
"max_qubits constraint is used."
)"""
fixed_circs = construct_final_subcircuits(fixed_circs, max_qubits)
map_qubits = get_qubit_map(fixed_circs)
return cut_locations, fixed_circs, map_qubits
[docs]
def run_cut_circuit(
subcircuits: list[QuantumCircuit],
cut_locations: np.ndarray[SingleQubitCutLocation],
observables: list[int | list[int]],
map_qubits: dict[int, int],
backend=AerSimulator(),
) -> np.ndarray[float]:
"""After splitting the circuit run the rest of the circuit knitting sequence.
Args:
subcircuits (list[QuantumCircuit]):
subcircuits containing the placeholder operations
cut_locations (np.ndarray[CutLocation]): list of cut locations
observables (list[int | list[int]]):
list of observables as qubit indices (Z observable)
backend: backend to use for running experiment circuits (optional)
Returns:
list: a list of expectation values
"""
if not isinstance(backend, AerSimulator):
transpiled_subcircuits = transpile_subcircuits(subcircuits,
cut_locations, backend)
(subexperiments,
coefs,
id_meas) = get_experiment_circuits(transpiled_subcircuits,
cut_locations)
else:
(subexperiments,
coefs,
id_meas) = get_experiment_circuits(subcircuits,
cut_locations)
results = run_experiments(
subexperiments,
cut_locations,
id_meas=id_meas,
backend=backend,
)
return estimate_expectation_values(
results, coefs, cut_locations, observables, map_qubits
)
[docs]
def run(
circuit: QuantumCircuit,
observables: list[int, list[int]],
backend=AerSimulator(),
) -> list[float]:
"""Run the whole circuit knitting sequence with one function call.
Args:
circuit (QuantumCircuit): circuit with cut experiments
observables (list[int | list[int]]):
list of observbles in the form of qubit indices (Z-obsevable).
backend: backend to use for running experiment circuits (optional)
Returns:
list: a list of expectation values
"""
# circuit = circuit.copy()
qss, circs, map_qubits = get_locations_and_subcircuits(circuit)
return run_cut_circuit(circs, qss, observables, map_qubits, backend)