Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ElectLeaders api(KIP-460) implemented #1818

Merged
merged 14 commits into from
Oct 9, 2024
36 changes: 34 additions & 2 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
AclOperation, AclPermissionType, AlterConfigOpType,
ScramMechanism, ScramCredentialInfo,
UserScramCredentialUpsertion, UserScramCredentialDeletion,
OffsetSpec)
OffsetSpec, ElectionType)
import sys
import threading
import logging
Expand Down Expand Up @@ -878,6 +878,36 @@ def example_delete_records(a, args):
f" before offset {partition.offset}: {e}")


def example_elect_leaders(a, args):
partitions = []
if (len(args) - 1) % 2 != 0:
raise ValueError("Invalid number of arguments for elect_leaders, Expected format: " +
"elect_leaders <election_type> [<topic1> <partition1>" +
" <topic2> <partition2> ..]")

try:
election_type = ElectionType[args[0]]
except KeyError:
raise ValueError(f"Invalid election_type: {args[0]}, expected 'PREFERRED' or 'UNCLEAN'")

for topic, partition in zip(args[1::2], args[2::2]):
partitions.append(TopicPartition(topic, int(partition)))

f = a.elect_leaders(election_type, partitions)
try:
results = f.result()
for partition, exception in results.items():
if exception is None:
print(f"Leader Election Successful for topic: '{partition.topic}'" +
f" partition: '{partition.partition}'")
else:
print(
"Leader Election Failed for topic: " +
f"'{partition.topic}' partition: '{partition.partition}': {exception}")
except KafkaException as e:
print(f"Error electing leaders: {e}")


if __name__ == '__main__':
if len(sys.argv) < 3:
sys.stderr.write('Usage: %s <bootstrap-brokers> <operation> <args..>\n\n' % sys.argv[0])
Expand Down Expand Up @@ -917,6 +947,7 @@ def example_delete_records(a, args):
sys.stderr.write(' list_offsets <isolation_level> <topic1> <partition1> <offset_spec1> ' +
'[<topic2> <partition2> <offset_spec2> ..]\n')
sys.stderr.write(' delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..]\n')
sys.stderr.write(' elect_leaders <election_type> [<topic1> <partition1> <topic2> <partition2> ..]\n')
sys.exit(1)

broker = sys.argv[1]
Expand Down Expand Up @@ -947,7 +978,8 @@ def example_delete_records(a, args):
'describe_user_scram_credentials': example_describe_user_scram_credentials,
'alter_user_scram_credentials': example_alter_user_scram_credentials,
'list_offsets': example_list_offsets,
'delete_records': example_delete_records}
'delete_records': example_delete_records,
'elect_leaders': example_elect_leaders}

if operation not in opsmap:
sys.stderr.write('Unknown operation: %s\n' % operation)
Expand Down
53 changes: 53 additions & 0 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
from ._listoffsets import (OffsetSpec, # noqa: F401
ListOffsetsResultInfo)

from ._election import (ElectionType) # noqa: F401

from ._records import DeletedRecords # noqa: F401

from .._model import TopicCollection as _TopicCollection
Expand Down Expand Up @@ -548,6 +550,22 @@ def _check_delete_records(request):
if req.partition < 0:
raise ValueError("'partition' cannot be negative")

@staticmethod
def _check_elect_leaders(election_type, partitions):
if not isinstance(election_type, ElectionType):
raise TypeError("Expected 'election_type' to be of type 'ElectionType'")
if partitions is not None:
if not isinstance(partitions, list):
raise TypeError("Expected 'partitions' to be a list, got " +
f"'{type(partitions).__name__}'")
for partition in partitions:
if not isinstance(partition, _TopicPartition):
raise TypeError("Element of the 'partitions' list must be of type 'TopicPartition'" +
f" got '{type(partition).__name__}' ")
if partition.partition < 0:
raise ValueError("Elements of the 'partitions' list must not have negative value" +
" for 'partition' field")

def create_topics(self, new_topics, **kwargs):
"""
Create one or more new topics.
Expand Down Expand Up @@ -1258,3 +1276,38 @@ def delete_records(self, topic_partition_offsets, **kwargs):

super(AdminClient, self).delete_records(topic_partition_offsets, f, **kwargs)
return futmap

def elect_leaders(self, election_type, partitions=None, **kwargs):
"""
Perform Preferred or Unclean leader election for
all the specified topic partitions.

:param ElectionType election_type: The type of election to perform.
:param List[TopicPartition]|None partitions: The topic partitions to perform
the election on. Use ``None`` to perform on all the topic partitions.
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
:param float operation_timeout: The operation timeout in seconds,
controlling how long the 'elect_leaders' request will block
on the broker waiting for the election to propagate
in the cluster. A value of 0 returns immediately.
Default: `socket.timeout.ms/1000.0`

:returns: A future. Method result() of the future returns
dict[TopicPartition, KafkaException|None].

:rtype: future

:raises KafkaException: Operation failed locally or on broker.
:raises TypeError: Invalid input type.
:raises ValueError: Invalid input value.
"""

AdminClient._check_elect_leaders(election_type, partitions)

f = AdminClient._create_future()

super(AdminClient, self).elect_leaders(election_type.value, partitions, f, **kwargs)

return f
29 changes: 29 additions & 0 deletions src/confluent_kafka/admin/_election.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2024 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum
from .. import cimpl as _cimpl


class ElectionType(Enum):
"""
Enumerates the different types of leader elections.
"""
PREFERRED = _cimpl.ELECTION_TYPE_PREFERRED #: Preferred election
UNCLEAN = _cimpl.ELECTION_TYPE_UNCLEAN #: Unclean election

def __lt__(self, other):
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value
119 changes: 119 additions & 0 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3059,6 +3059,104 @@ const char Admin_delete_records_doc[] = PyDoc_STR(
"\n"
" This method should not be used directly, use confluent_kafka.AdminClient.delete_records()\n");

/**
* @brief Elect leaders
*/
PyObject *Admin_elect_leaders(Handle *self, PyObject *args, PyObject *kwargs) {
PyObject *election_type = NULL, *partitions = NULL, *future;
rd_kafka_ElectLeaders_t *c_elect_leaders = NULL;
rd_kafka_ElectionType_t c_election_type;
struct Admin_options options = Admin_options_INITIALIZER;
rd_kafka_AdminOptions_t *c_options = NULL;
rd_kafka_topic_partition_list_t *c_partitions = NULL;
CallState cs;
rd_kafka_queue_t *rkqu;

static char *kws[] = {"election_type",
"partitions"
"future",
/* options */
"request_timeout", "operation_timeout", NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OOO|ff", kws,
&election_type, &partitions, &future,
&options.request_timeout,
&options.operation_timeout)) {
goto err;
}

c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_ELECTLEADERS,
&options, future);
if (!c_options) {
goto err; /* Exception raised by options_to_c() */
}

/* options_to_c() sets future as the opaque, which is used in the
* background_event_cb to set the results on the future as the
* admin operation is finished, so we need to keep our own refcount. */
Py_INCREF(future);

c_election_type = (rd_kafka_ElectionType_t)cfl_PyInt_AsInt(election_type);

if (partitions != Py_None && !PyList_Check(partitions)) {
PyErr_SetString(PyExc_ValueError, "partitions must be None or a list");
goto err;
}

if (partitions != Py_None) {
c_partitions = py_to_c_parts(partitions);
}

c_elect_leaders = rd_kafka_ElectLeaders_new(c_election_type, c_partitions);

if(c_partitions) {
rd_kafka_topic_partition_list_destroy(c_partitions);
}

/* Use librdkafka's background thread queue to automatically dispatch
* Admin_background_event_cb() when the admin operation is finished. */
rkqu = rd_kafka_queue_get_background(self->rk);

/**
*
* Call ElectLeaders
*
* We need to set up a CallState and release GIL here since
* the event_cb may be triggered immediately.
*
*/
CallState_begin(self, &cs);
rd_kafka_ElectLeaders(self->rk, c_elect_leaders, c_options, rkqu);
CallState_end(self, &cs);

rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */

rd_kafka_AdminOptions_destroy(c_options);
rd_kafka_ElectLeaders_destroy(c_elect_leaders);

Py_RETURN_NONE;

err:
if (c_elect_leaders) {
rd_kafka_ElectLeaders_destroy(c_elect_leaders);
}
if (c_options) {
rd_kafka_AdminOptions_destroy(c_options);
Py_DECREF(future);
}
return NULL;
}

const char Admin_elect_leaders_doc[] = PyDoc_STR(
".. py:function:: elect_leaders(election_type, partitions, "
"future, [request_timeout, operation_timeout])\n"
"\n"
" Perform Preferred or Unclean election for the specified "
"Topic Partitions.\n"
"\n"
" This method should not be used directly, use "
"confluent_kafka.AdminClient.elect_leaders()\n");

/**
* @brief Call rd_kafka_poll() and keep track of crashing callbacks.
* @returns -1 if callback crashed (or poll() failed), else the number
Expand Down Expand Up @@ -3225,6 +3323,10 @@ static PyMethodDef Admin_methods[] = {
Admin_delete_records_doc
},

{ "elect_leaders", (PyCFunction)Admin_elect_leaders, METH_VARARGS | METH_KEYWORDS,
Admin_elect_leaders_doc
},

{ NULL }
};

Expand Down Expand Up @@ -4875,6 +4977,23 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
break;
}

case RD_KAFKA_EVENT_ELECTLEADERS_RESULT:
{
size_t c_result_cnt;

const rd_kafka_ElectLeaders_result_t
*c_elect_leaders_res_event =
rd_kafka_event_ElectLeaders_result(rkev);

const rd_kafka_topic_partition_result_t **partition_results =
rd_kafka_ElectLeaders_result_partitions(
c_elect_leaders_res_event, &c_result_cnt);

result = c_topic_partition_result_to_py_dict(partition_results, c_result_cnt);

break;
}

default:
Py_DECREF(error); /* Py_None */
error = KafkaError_new0(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
Expand Down
9 changes: 9 additions & 0 deletions src/confluent_kafka/src/AdminTypes.c
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,14 @@ static void AdminTypes_AddObjectsOffsetSpec (PyObject *m) {
PyModule_AddIntConstant(m,"OFFSET_SPEC_LATEST", RD_KAFKA_OFFSET_SPEC_LATEST);
}

static void AdminTypes_AddObjectsElectionType(PyObject *m) {
/* rd_kafka_ElectionType_t */
PyModule_AddIntConstant(m, "ELECTION_TYPE_PREFERRED",
RD_KAFKA_ELECTION_TYPE_PREFERRED);
PyModule_AddIntConstant(m, "ELECTION_TYPE_UNCLEAN",
RD_KAFKA_ELECTION_TYPE_UNCLEAN);
}

/**
* @brief Add Admin types to module
*/
Expand All @@ -616,4 +624,5 @@ void AdminTypes_AddObjects (PyObject *m) {
AdminTypes_AddObjectsScramMechanismType(m);
AdminTypes_AddObjectsIsolationLevel(m);
AdminTypes_AddObjectsOffsetSpec(m);
AdminTypes_AddObjectsElectionType(m);
}
Loading