Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Which version of kafka does this connector supports? #273

Closed
arshahmad1 opened this issue Jul 3, 2024 · 14 comments
Closed

Which version of kafka does this connector supports? #273

arshahmad1 opened this issue Jul 3, 2024 · 14 comments

Comments

@arshahmad1
Copy link

arshahmad1 commented Jul 3, 2024

hi guys, I'm using this connector to sink data from kafka to s3 as iceberg tables but I'm getting the following exception:

java.lang.NoSuchMethodError: 'org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions.requireStable(boolean)'

What version of kafka is supported by this connector? my aws managed kafka version is 3.5.1

Also I tried downgrading my kafka to 2.8.2 and I'm still getting the same error

@arshahmad1
Copy link
Author

hey @tabmatfournier, Sorry to ping you directly. Can you please help me here.

@antcalvente
Copy link

antcalvente commented Jul 11, 2024

I'm facing the same issue on my side as well (using MSK kafka version 3.7.1), looks like it's an issue with transitive dependencies maybe? (as I see kafka-ver declared as 3.5.1 and requireStable method was introduced from version 3.3.* https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.html#requireStable(boolean)

I had to downgrade the connector to old versions to avoid this issue, but I'm facing other problems like constants re-joining to group and Commit timeouts

I tried to force with gradle resolutionStrategies and constraints but even though it shows the correct version, on runtime it goes to one prior to 3.3.* from what it seems

Full stacktrace:

[Worker-0e0622204e5dac11a] [2024-07-11 14:20:35,357] ERROR [SinkConnector\|task-0] WorkerSinkTask{id=SinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
--
[Worker-0e0622204e5dac11a] java.lang.NoSuchMethodError: 'org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions.requireStable(boolean)'
[Worker-0e0622204e5dac11a] 	at io.tabular.iceberg.connect.channel.CommitterImpl.fetchStableConsumerOffsets(CommitterImpl.java:116)
[Worker-0e0622204e5dac11a] 	at io.tabular.iceberg.connect.channel.CommitterImpl.<init>(CommitterImpl.java:97)
[Worker-0e0622204e5dac11a] 	at io.tabular.iceberg.connect.channel.CommitterImpl.<init>(CommitterImpl.java:70)
[Worker-0e0622204e5dac11a] 	at io.tabular.iceberg.connect.channel.TaskImpl.<init>(TaskImpl.java:37)
[Worker-0e0622204e5dac11a] 	at io.tabular.iceberg.connect.IcebergSinkTask.open(IcebergSinkTask.java:56)
[Worker-0e0622204e5dac11a] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:640)
[Worker-0e0622204e5dac11a] 	at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:705)
[Worker-0e0622204e5dac11a] 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
[Worker-0e0622204e5dac11a] 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
[Worker-0e0622204e5dac11a] 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
[Worker-0e0622204e5dac11a] 	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257)
[Worker-0e0622204e5dac11a] 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226)
[Worker-0e0622204e5dac11a] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:457)
[Worker-0e0622204e5dac11a] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)
[Worker-0e0622204e5dac11a] 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-0e0622204e5dac11a] 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-0e0622204e5dac11a] 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-0e0622204e5dac11a] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-0e0622204e5dac11a] 	at java.base/java.lang.Thread.run(Thread.java:829)

@ron-damon
Copy link

Hi guys,

IIUC the problem is MSK Connect's version¹ (2.7.1), not MSK per se.
We're using the sink conector (v0.6.19) against MSK (Kafka v3.5.1) without any problem, but we had to move from MSK Connect to a non-manged kafka connect.

¹ From Amazon Managed Streaming for Apache Kafka's Developer Guide:

What is MSK Connect?
MSK Connect is a feature of Amazon MSK that makes it easy for developers to stream data to and from their Apache Kafka clusters. MSK Connect uses Kafka Connect 2.7.1, an open-source framework for connecting Apache Kafka clusters with external systems such as databases, search indexes, and file systems.
...

@antcalvente
Copy link

antcalvente commented Jul 17, 2024

Hi @ron-damon,
Agree with you, the MSK connector "shouldn't" be related to this issue. I'm currently using my own version of the connector with MSK connectors and it works wonderfully but I had to do 3 things.

First two are simple and I'll open a PR for them asap (edit: #275):

  • Increase initial polling duration on CommitterImpl.java#L104 to 30s
  • Increase initial polling duration on Coordinator.java#L93 to 30s

This solves the disconnection issues that I was having with remote kafka servers or MSK, now it connects without any issue (this didn't happen on a local kafka for obvious reasons as the connection between connectors and kafka server is immediate). Regarding this, I haven't been able to see any downside of increasing the timeout apart from being a completely random timeout that works for MSK (as it's set already anyway on the code), but still, it's a magic number that works for my case...

And the last one is a trick to make the connector compatible with kafka-clients and solve the issue I posted before for kafka-ver < 3.3:

  • Remove "requireStable(true)" from CommiterImpl.fetchStableConsumerOffsets()
    So the issue with kafka-ver is still present on the latest master that should be solved/checked

@Sharu95
Copy link

Sharu95 commented Jul 23, 2024

Stumbled upon this experiencing the same issue on the MSK broker I'm working with. MSK Connect version 2.7.1, and I'm on MSK kafka version 2.6.2, unfortunately not possible to update myself.

The requireStable was introduced after 3.3 it seems like, yeah, so I'm getting the same issue as @arshahmad1, no such method found 😅

Any backward compatible implementation would be amazing! @antcalvente, if you remove requireStable, how would offset stability be handled? 🤔

@antcalvente
Copy link

Hi @Sharu95 !
So far I have been generating more than 1TB of data (around 3 billion messages) in different topics and, apart from seeing some rebalances messages at the beginning when creating the connector, everything runs smoothly.
I have a control topic for each topic (which is not necessary as per documentation) and the only major error I have faced is when recreating the main topic with the connector running in the background which directly dies and needs manual recreation. In this specific case the consumer goes a bit crazy and even after recreating it does weird things like having clashes and marking new messages as duplicated.
I'll suggest you to fork the repo and remove that requireStable(true) by yourself in case it unblocks you until a maintainer from the project takes a look at the issue... (that worked for me)

As ron mentioned before in any case, the issue seems to come from a dependency from this project/MSK Connect and not your MSK server version :)

@Sharu95
Copy link

Sharu95 commented Jul 24, 2024

Thanks for the reply @antcalvente! I've forked it and will stick to that version as for now, just exploring currently, so nothing urgently blocking really :) Can't do much on the MSK/Connect versions anyway, so keeping a fork is a good workaround until things are fixed

@arshahmad1
Copy link
Author

arshahmad1 commented Jul 26, 2024

hey @antcalvente, Thank you soo much for taking time out and providing such detailed response.
As you mentioned above I did the following and it stops throwing NoSuchMethodError error:

  1. Increased initial polling duration on CommitterImpl.java#L104 to 30s
  2. Increased initial polling duration on Coordinator.java#L93 to 30s
  3. Removed "requireStable(true)" from CommiterImpl.fetchStableConsumerOffsets()
  4. Then I ran ./gradlew -xtest clean build
  5. And the zip archive gets under ./kafka-connect-runtime/build/distributions

Thanks again for your response and help!

@arshahmad1
Copy link
Author

@antcalvente can you please provide your configurations for reference? I searched the internet and there are no reliable or precise configurations. It will be of great help. Thanks in advance.

@antcalvente
Copy link

Here you go @arshahmad1 : https://gist.github.com/antcalvente/27d248019665c260e7c155d4d0860341

Hope it helps :)

@Sharu95
Copy link

Sharu95 commented Jul 26, 2024

@arshahmad1, I'm using the glue catalog in AWS. As per documentation, remember to set "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog" instead of iceberg.catalog: glue

edit: These are the required options for glue; https://github.com/tabular-io/iceberg-kafka-connect?tab=readme-ov-file#glue-example

@arshahmad1
Copy link
Author

arshahmad1 commented Jul 26, 2024

Actually @Sharu95 I'm using below configurations but currently running into Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Invalid table identifier: poc-kafka-flink.mskmytopicma_ib (org.apache.kafka.connect.runtime.WorkerSinkTask:612)

@Sharu95
Copy link

Sharu95 commented Jul 27, 2024

@arshahmad1, whats the full stack trace?

I only use iceberg.tables=<my_database>.<my_table>, not the other ones (iceberg.glue.database and iceberg.glue.table). Also double check the database name and whether or not hyphens might cause the issue.

Also make sure the control topic is created, I can't see that you've specified any here, so I assume you have created it, but giving the full stack trace might be helpful 😄

@arshahmad1
Copy link
Author

I was able to solve this issue, you should NOT use underscores and hyphens in your database and table names 🤦

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants