3

I want to create and use java utilities for getting the information and create / modify / delete topics.

To create the utilities, I am trying the example from this link

This is how I have set up the properties in my code:

Properties adminConfig = new Properties();
adminConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "server1:9092,server2:9092,server3:9092");
adminConfig.put("security.protocol", "SASL_SSL");
adminConfig.put("security.mechanism", "PLAINTEXT");
adminConfig.put("ssl.keystore.type", "JKS");
adminConfig.put("ssl.keystore.location", "/config/dev/java.keystore.jks");
adminConfig.put("ssl.keystore.password", "password");
adminConfig.put("ssl.key.password", "password");
adminConfig.put("ssl.truststore.location", "/config/dev/java.truststore.jks");
adminConfig.put("ssl.truststore.password", "password");
adminConfig.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"client\" password=\"client-secret\";");
AdminClient admin = KafkaAdminClient.create(adminConfig);
for (Node node : admin.describeCluster().nodes().get()) {
    System.out.println("-- node: " + node.id() + " --");
    ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, "0");
    DescribeConfigsResult dcr = admin.describeConfigs(Collections.singleton(cr));
    dcr.all().get().forEach((k, c) -> {
        c.entries()
            .forEach(configEntry -> {
                System.out.println(configEntry.name() + "= " + configEntry.value())
        });
    });
}

The error I am getting is below

WARNING: Illegal reflective access by org.apache.kafka.common.network.SaslChannelBuilder (file:/Users/x1234/.m2/repository/org/apache/kafka/kafka-clients/2.3.0/kafka-clients-2.3.0.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.kafka.common.network.SaslChannelBuilder
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
    at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:407)
    at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:55)
    at kafka.utils.ListingKafkaConfigs.main(ListingKafkaConfigs.java:42)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:160)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
    at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:382)
    ... 2 more
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
    at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)
    at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:104)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:149)
    ... 6 more

If I do not set the SSL info, I get the below error

Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at kafka.utils.ListingKafkaConfigs.main(ListingKafkaConfigs.java:18)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

What am I doing wrong when setting up the properties for using the utility?

1
  • Can you please share the current adminClient with the right configurations?
    – Zvi Mints
    Commented Aug 1, 2021 at 9:00

1 Answer 1

1

Your configuration uses security.mechanism which is not a valid setting. See the Admin client configuration section on the Apache Kafka website for the list of settings.

To fix your issue, replace:

adminConfig.put("security.mechanism", "PLAINTEXT");

by

adminConfig.put("sasl.mechanism", "PLAIN");
2
  • Thanks. I tried as that as well. Still getting : org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. What are all properties must I set? Also I think our set-up is SASL_SSL - how / where do find the difference between the two and confirm what setting our implementation has? Thanks
    – adbdkb
    Commented Feb 17, 2021 at 4:39
  • I had another parameter value wrong - broker id. Once I fixed that, it worked. Thanks
    – adbdkb
    Commented Feb 20, 2021 at 9:41

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.