Компьютерный справочник
5 просмотров
Рейтинг статьи
1 звезда2 звезды3 звезды4 звезды5 звезд

Concurrent modification exception java

java.util.concurrentmodificationexception – How to handle Concurrent Modification Exception

Posted by: Sotirios-Efstathios Maneas in exceptions December 31st, 2013 0 Views

The ConcurrentModificationException is a RuntimeException that may be thrown by methods that have detected concurrent modification of an object, when such modification is not permissible. An example of not permissible behavior is when a thread tries to modify the internal structure of a Collection , while another thread is iterating over it.

In general, the results of the iteration are undefined. Some iterators throw a ConcurrentModificationException when they detect such behavior. These iterators are called fail-fast iterators , as they stop the normal execution to report an error, rather that continuing in a non-deterministic way. Notice that this exception does not indicate that the object has been concurrently modified by a different thread. The exception is thrown even one thread is violating an object’s contract.

1. Error cases

In this section, we will describe and explain those cases that produce a ConcurrentModificationException .

Case 1: The Collection is internally modified, while a thread is iterating over it.

The result of the execution is:

The exception is thrown because we change the internal structure of the HashMap by removing an existing key, while we iterating over it.

Case 2: After the creation of an iterator, the Collection is internally modified by any method other than the iterator’s own methods for removal and addition.

The result of the execution is:

The exception is thrown because:

  • We create an iterator of the ArrayList .
  • We remove an object using the remove method, rather than the iterator’s own remove method.
  • We try to iterate over the list and thus, a ConcurrentModificationException is thrown.

Case 3: Two iterators simultaneously modify the internal structure of a Collection.

The result of the execution is:

The exception is thrown because:

  • We create two iterators of the ArrayList .
  • The 1 st iterator modifies the internal structure of the list, by removing its first object.
  • The 2 nd iterator tries to remove the first object as well, but the first object does not exist and thus, a ConcurrentModificationException is thrown.

2. Deal with the exception

First of all, we must understand how Java decides that a collection is modified concurrently and a ConcurrentModificationException must be thrown. In Java 7, the implementation of the ArrayList uses the following field, to provide a fail-fast iterator:

Following the same logic, the implementation of the HashMap uses the following field:

In both implementations, the modCount field indicates the number of times the collection has been structurally modified. For example, a structural modification can be an operation that changes the number of mappings in a HashMap , or an operation that changes the size of an ArrayList .

In any case, if the value of the modCount field changes unexpectedly, a ConcurrentModificationException is thrown.

In the rest of this section, we will discuss about techniques and tricks that help us avoid the ConcurrentModificationException :

Iterator’s remove method

In a single-threaded environment, use the iterator’s remove method, in order to concurrently iterate over a collection and remove things from it. For example:


In order to avoid more than one threads accessing or modifying the same object, you can synchronize them over the object, in order to allow only one thread to manipulate it over the time. However, notice that this approach may reduce the performance of your application, or create deadlocks, if the application has not been developed carefully.

Synchronized Collections

In addition to their default implementations, Java provides a synchronized implementation of a Map , a List , a Set , a Collection , etc. through the Collections class. Moreover, Java provides the CopyOnWriteArrayList class, in which all mutative operations are implemented by making a fresh copy of the underlying array. Finally, Java also provides the ConcurrentHashMap class, which offers full concurrency of retrievals and adjustable expected concurrency for updates.

All referenced implementations are thread safe. However, the usage of such data structures may also reduce the performance of your application, as thread synchronization spends CPU cycles.

To conclude, all aforementioned methods aim to eliminate the ConcurrentModificationException . However, in a multi-threaded environment this elimination usually comes with the cost of thread synchronization. In any case, each application has its own specifications and requirements and thus, a meticulous design and implementation are very important in order for such exceptions to be eliminated.

3. Download The Eclipse Project

This was a tutorial on how to handle the ConcurrentModificationException .

Avoid ConcurrentModificationException while looping over Java ArrayList?

Apart from the NullPointerException and ClassNotFoundException, ConcurrentModificationException is another nightmare for Java developers. What makes this error tricky is the word concurrent, which always mislead Java programmers that this exception is coming because multiple threads are trying to modify the collection at the same time. Then begins the hunting, they spent countless hours to find the code which has the probability of concurrent modification. While in reality ConcurrentModficationException can also come on the single threaded environment. To give you an example, just loop over a list using for loop and try to remove one element, you will get the ConcurrentModificatoinExcetpion? Why? because you broke the rule of not modifying a Collection during iteration.

How does Java knows to throw ConcurrentModificationExeption? It uses a transient variable called modCount , which keeps track of how many times a list is modified structurally. Structural modifications are those that change the size of the list, which may affect the progress of iteration and may yield incorrect results. Both Iterator and ListIterator uses this field to detect unexpected change. Other methods of List which structurally modify List also uses this method e.g. add() , remove() .

Problem: loop over an ArrayList and remove selected elements, but remove() is throwing «Exception in thread «main» java.util.ConcurrentModificationException» .

Читать еще:  Java fileinputstream read

Cause: The real cause of ConcurrentModfiicationException is inconsistent modCount. When you are iterating over ArrayList then Iterator’s next() method keep track of modCount. If you modify the collection by adding or removing element then modCount will change and it will not match with the expected modCount, hence Iterator will throw ConcurrentModificationException.

Here is the code snippet from hasNext() method which shows there is check for modCount:

Now if you check this checkForComodification() method, you will find what I just said:

Solution: Use Iterator if you are doing it on the single threaded environment, otherwise use concurrent collection classes e.g. CopyOnWriteArrayList to remove elements while you are looping over it.

Solving ConcurrentModificationException in ArrayList

Here is the Java program to demonstrate one scenario where you get the ConcurrentModificationException even if just one thread is modifying the ArrayList. In this example, we are looping over ArrayList using advanced for loop and removing selected elements, but because we are using ArrayList’s remove() method.

If you uncomment the commented code in first loop and second loop, you will get the following exception:

Exception in thread «main» java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(Unknown Source)
at java.util.ArrayList$Itr.next(Unknown Source)
at dto.ReverseArrayInPlace.main(ReverseArrayInPlace.java:28)

because we are using ArrayList’s remove() method. In the second example, we have used remove() method of Iterator and that’s why we are successfully able to delete selected elements from the ArrayList without ConcurrentModificationException.

Here is summary of important points about solving ConcurrentModificationException while looping over ArrayList in Java :

Многопоточное программирование в Java 8. Часть первая. Параллельное выполнение кода с помощью потоков

    Переводы, 8 июля 2015 в 16:58

Добро пожаловать в первую часть руководства по параллельному программированию в Java 8. В этой части мы на простых примерах рассмотрим, как выполнять код параллельно с помощью потоков, задач и сервисов исполнителей.

Впервые Concurrency API был представлен вместе с выходом Java 5 и с тех пор постоянно развивался с каждой новой версией Java. Большую часть примеров можно реализовать на более старых версиях, однако в этой статье я собираюсь использовать лямбда-выражения. Если вы все еще не знакомы с нововведениями Java 8, рекомендую посмотреть мое руководство.

Потоки и задачи

Все современные операционные системы поддерживают параллельное выполнение кода с помощью процессов и потоков. Процесс — это экземпляр программы, который запускается независимо от остальных. Например, когда вы запускаете программу на Java, ОС создает новый процесс, который работает параллельно другим. Внутри процессов мы можем использовать потоки, тем самым выжав из процессора максимум возможностей.

Потоки (threads) в Java поддерживаются начиная с JDK 1.0. Прежде чем запустить поток, ему надо предоставить участок кода, который обычно называется «задачей» (task). Это делается через реализацию интерфейса Runnable , у которого есть только один метод без аргументов, возвращающий void — run() . Вот пример того, как это работает:

Поскольку интерфейс Runnable функциональный, мы можем использовать лямбда-выражения, которые появились в Java 8. В примере мы создаем задачу, которая выводит имя текущего потока на консоль, и запускаем ее сначала в главном потоке, а затем — в отдельном.

Результат выполнения этого кода может выглядеть так:

Из-за параллельного выполнения мы не можем сказать, будет наш поток запущен до или после вывода «Done!» на экран. Эта особенность делает параллельное программирование сложной задачей в больших приложениях.

Sportmaster Lab, Москва

Потоки могут быть приостановлены на некоторое время. Это весьма полезно, если мы хотим сэмулировать долго выполняющуюся задачу. Например, так:

Когда вы запустите этот код, вы увидите секундную задержку между выводом первой и второй строки на экран. TimeUnit — полезный класс для работы с единицами времени, но то же самое можно сделать с помощью Thread.sleep(1000) .

Работать с потоками напрямую неудобно и чревато ошибками. Поэтому в 2004 году в Java 5 добавили Concurrency API. Он находится в пакете java.util.concurrent и содержит большое количество полезных классов и методов для многопоточного программирования. С тех пор Concurrency API непрерывно развивался и развивается.

Давайте теперь подробнее рассмотрим одну из самых важных частей Concurrency API — сервис исполнителей (executor services).


Concurrency API вводит понятие сервиса-исполнителя (ExecutorService) — высокоуровневую замену работе с потоками напрямую. Исполнители выполняют задачи асинхронно и обычно используют пул потоков, так что нам не надо создавать их вручную. Все потоки из пула будут использованы повторно после выполнения задачи, а значит, мы можем создать в приложении столько задач, сколько хотим, используя один исполнитель.

Вот как будет выглядеть наш первый пример с использованием исполнителя:

Класс Executors предоставляет удобные методы-фабрики для создания различных сервисов исполнителей. В данном случае мы использовали исполнитель с одним потоком.

Результат выглядит так же, как в прошлый раз. Но у этого кода есть важное отличие — он никогда не остановится. Работу исполнителей надо завершать явно. Для этого в интерфейсе ExecutorService есть два метода: shutdown() , который ждет завершения запущенных задач, и shutdownNow() , который останавливает исполнитель немедленно.

Вот как я предпочитаю останавливать исполнителей:

Исполнитель пытается завершить работу, ожидая завершения запущенных задач в течение определенного времени (5 секунд). По истечении этого времени он останавливается, прерывая все незавершенные задачи.

Callable и Future

Кроме Runnable , исполнители могут принимать другой вид задач, который называется Callable . Callable — это также функциональный интерфейс, но, в отличие от Runnable , он может возвращать значение.

Давайте напишем задачу, которая возвращает целое число после секундной паузы:

Callable-задачи также могут быть переданы исполнителям. Но как тогда получить результат, который они возвращают? Поскольку метод submit() не ждет завершения задачи, исполнитель не может вернуть результат задачи напрямую. Вместо этого исполнитель возвращает специальный объект Future, у которого мы сможем запросить результат задачи.

После отправки задачи исполнителю мы сначала проверяем, завершено ли ее выполнение, с помощью метода isDone() . Поскольку задача имеет задержку в одну секунду, прежде чем вернуть число, я более чем уверен, что она еще не завершена.

Читать еще:  Setbounds в java

Вызов метода get() блокирует поток и ждет завершения задачи, а затем возвращает результат ее выполнения. Теперь future.isDone() вернет true , и мы увидим на консоли следующее:

Задачи жестко связаны с сервисом исполнителей, и, если вы его остановите, попытка получить результат задачи выбросит исключение:

Вы, возможно, заметили, что на этот раз мы создаем сервис немного по-другому: с помощью метода newFixedThreadPool(1) , который вернет исполнителя с пулом в один поток. Это эквивалентно вызову метода newSingleThreadExecutor() , однако мы можем изменить количество потоков в пуле.


Любой вызов метода future.get() блокирует поток до тех пор, пока задача не будет завершена. В наихудшем случае выполнение задачи не завершится никогда, блокируя ваше приложение. Избежать этого можно, передав таймаут:

Выполнение этого кода вызовет TimeoutException :

Вы уже, возможно, догадались, почему было выброшено это исключение: мы указали максимальное время ожидания выполнения задачи в одну секунду, в то время как ее выполнение занимает две.


Исполнители могут принимать список задач на выполнение с помощью метода invokeAll() , который принимает коллекцию callable-задач и возвращает список из Future .

В этом примере мы использовали функциональные потоки Java 8 для обработки задач, возвращенных методом invokeAll . Мы прошлись по всем задачам и вывели их результат на консоль. Если вы не знакомы с потоками (streams) Java 8, смотрите мое руководство.


Другой способ отдать на выполнение несколько задач — метод invokeAny() . Он работает немного по-другому: вместо возврата Future он блокирует поток до того, как завершится хоть одна задача, и возвращает ее результат.

Чтобы показать, как работает этот метод, создадим метод, эмулирующий поведение различных задач. Он будет возвращать Callable , который вернет указанную строку после необходимой задержки:

Используем этот метод, чтобы создать несколько задач с разными строками и задержками от одной до трех секунд. Отправка этих задач исполнителю через метод invokeAny() вернет результат задачи с наименьшей задержкой. В данном случае это «task2»:

В примере выше использован еще один вид исполнителей, который создается с помощью метода newWorkStealingPool() . Этот метод появился в Java 8 и ведет себя не так, как другие: вместо использования фиксированного количества потоков он создает ForkJoinPool с определенным параллелизмом (parallelism size), по умолчанию равным количеству ядер машины.

ForkJoinPool впервые появился в Java 7, и мы рассмотрим его подробнее в следующих частях нашего руководства. А теперь давайте посмотрим на исполнители с планировщиком (scheduled executors).

Исполнители с планировщиком

Мы уже знаем, как отдать задачу исполнителю и получить ее результат. Для того, чтобы периодически запускать задачу, мы можем использовать пул потоков с планировщиком.

ScheduledExecutorService способен запускать задачи один или несколько раз с заданным интервалом.

Этот пример показывает, как заставить исполнитель выполнить задачу через три секунды:

Когда мы передаем задачу планировщику, он возвращает особый тип Future — ScheduledFuture , который предоставляет метод getDelay() для получения оставшегося до запуска времени.

У исполнителя с планировщиком есть два метода для установки задач: scheduleAtFixedRate() и scheduleWithFixedDelay() . Первый устанавливает задачи с определенным интервалом, например, в одну секунду:

Кроме того, он принимает начальную задержку, которая определяет время до первого запуска.

Обратите внимание, что метод scheduleAtFixedRate() не берет в расчет время выполнения задачи. Так, если вы поставите задачу, которая выполняется две секунды, с интервалом в одну, пул потоков рано или поздно переполнится.

В этом случае необходимо использовать метод scheduleWithFixedDelay() . Он работает примерно так же, как и предыдущий, но указанный интервал будет отсчитываться от времени завершения предыдущей задачи.

В этом примере мы ставим задачу с задержкой в одну секунду между окончанием выполнения задачи и началом следующей. Начальной задержки нет, и каждая задача выполняется две секунды. Так, задачи будут запускаться на 0, 3, 6, 9 и т. д. секунде. Как видите, метод scheduleWithFixedDelay() весьма полезен, если мы не можем заранее сказать, сколько будет выполняться задача.

Это была первая часть серии статей про многопоточное программирование. Настоятельно рекомендую разобрать вышеприведенные примеры самостоятельно. Все они доступны на GitHub. Можете смело форкать репозиторий и добавлять его в избранное.

Надеюсь, вам понравилась статья. Если у вас возникли какие-либо вопросы, вы можете задать их в твиттере.

Основные типы исключений (Exception) в java

Перечень наиболее часто встречающихся исключений (исключительных ситуаций, ошибок) в языке программирования java с расшифровкой их значения.


Возникла исключительная ситуация, связанная с ошибкой при выполнении арифметического вычисления (например, с попыткой целочисленного деления на нуль). Класс ArithmeticalException унаследован от RuntimeException.


Задано значение индекса массива, не принадлежащее допустимому диапазону. Имеется дополнительный конструктор, принимающий в качестве параметра ошибочное значение индекса и включающий его в текст описательного сообщения. Класс ArrayIndexOutOfBoundsException унаследован от IndexOutOfBoundException


Предпринята попытка сохранения в массиве объекта недопустимого типа. Возникает, если попытаться записать в ячейку массива ссылку на объект неправильного типа.

Класс ArrayStoreException унаследован от RuntimeException.


Выполнена неверная операция преобразования типов (ошибка приведения типов).

Класс ClassCastException унаследован от RuntimeException.


Осуществлена попытка изменения объекта конкурирующим потоком вычислений (thread) с нарушением контракта класса (тип определен в пакете jav.util).

Также исключение может происходить при работе с коллекциями при обычной однопоточной работе. ConcurrentModificationException возникает когда коллекция модифицируется «одновременно» с проходом по коллекции итератором любыми средствами кроме самого итератора.

Класс ConcurrentModificationException унаследован от RuntimeException.


Возникает при попытке извлечения объекта из пустого стека. Тип обладает только конструктором без параметров, поскольку причина ситуации очевидна без дополнительных разъяснений (тип определен в пакете java.util).

Класс EmptyStackExceptionунаследован от RuntimeException.


Методу передано неверное значение аргумента (например, отрицательное, когда метод предполагает задание положительных значений).

Класс IllegalArgumentExceptionунаследован от RuntimeException.


Выполнено обращение к методу wait, notifyAll или notify объекта, когда текущий поток вычислений не обладает блокировкой (lock) этого объекта.

Класс IllegalMonitorStateException унаследован от RuntimeException.


Предпринята попытка выполнения операции в то время, когда объект не находится в соответствующем состоянии (например при регистрации или удалении ловушки события закрытия исполняющей системы (shutdown hook) после начала процедуры закрытия).

Читать еще:  Стандартная ошибка уравнения регрессии

Класс IllegalStateExceptionунаследован от RuntimeException.


Предпринята попытка выполнения операции в то время, когда объект потока вычислений не находится в соответствующем состоянии (например, вызван метод start для потока, который уже приступил к работе).

Класс IllegalThreadStateException унаследован от IllegalArgumentException


Задано значение индекса массива или содержимого строки типа String, не принадлежащее допустимому диапазону.

Класс IndexOutOfBoundsException унаследован от RuntimeException


Не найден требуемый ресурс или пакет ресурсов (resource bundle). Единственный конструктор типа предусматривает задание трех аргументов: строки описательного сообщения, наименования класса ресурсов и объекта ключа, отвечающего отсутствующему ресурсу. Для получения строк наименования класса и ключа применяются методы detClassName и getKey соответственно (тип определен в пакете java.util).

Класс MissingResourceExceptionунаследован от RuntimeException.


Предпринята попытка создания массива с размером, значение которого задано отрицательным числом.

Класс NegativeArraySizeException унаследован от RuntimeException.


Операция поиска элемента в объекте одного из контейнерных классов завершилась неудачей (тип определен в пакете java.util).

Класс NoSuchElementException унаследован от RuntimeException.


Возникает при попытке обращения к полю, методу или объекту по ссылке, равной null. Также исключение выбрасывается, когда метод, не допускающий передачи аргумента null, был вызван с заданием значения null. В последнем случае может быть сгенерировано и исключение типа IllegalArgumentException.

Класс NullPointerException унаследован от RuntimeException.


Строка, которая, как предполагалось должна содержать представление числа, не отвечает этому требованию. Исключение выбрасывается такими методами, как, например, Integer.parseInt.

Класс NumberFormatException унаследован от IllegalArgumentException.


Предпринята попытка выполнения операции, запрещенной системой обеспечения безопасности в соответствии с действующей политикой безопасности.

Класс SecurityException унаследован от RuntimeException.


Задано значение индекса содержимого строки типа String, не принадлежащее допустимому диапазону. Имеется дополнительный конструктор, принимающий в качестве параметра ошибочное значение индекса и включающий его в текст описательного сообщения.

Класс StringIndexOutOfBoundsException унаследован от IndexOutOfBoundsException.


Выбрасывается при обращении к методу целевого объекта посредством объекта рефлективного класса Proxy, если метод invoke объекта InvocationHandler генерирует объявляемое исключение, которое не допускает присваивания ни одному из типов исключений, упомянутых в предложении throws метода целевого объекта. Рассматриваемое исключение содержит ссылку на исключение, генерируемое методом invoke, которое может быть получено с помощью метода getUndeclaredThrowable. Класс исключений UndeclaredThrowableException поддерживает два конструктора: оба принимают в качестве параметров ссылку на объект Throwable, а один из них, помимо того, строку описания (тип определен в пакете java.lang.reflect).

Класс UndeclaredThrowableException унаследован от RuntimeException.


Предпринята попытка выполнения операции над объектом, который ее не поддерживает (например, модификация объекта, обозначенного признаком «только для чтения»). используется также классами коллекций из состава пакета java.util как реакция на вызов методов производного класса, реализация которых не обязательна.

Класс UnsupportedOperationException унаследован от RuntimeException.

java.util.ConcurrentModificationException: KafkaConsumer небезопасно для многопоточного доступа

У меня есть приложение Scala Spark Streaming , которое получает данные из одной темы из 3 разных Kafka producers .

Потоковое приложение Свеча на машину с хоста , Кафка сервер на машину с хоста , Kafka producers на на машины, , , .

Когда я пытаюсь запустить приложение Spark Streaming получил ниже ошибки

Исключение в потоке «main» org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задание 0 на этапе 19.0 не удалось выполнить 1 раз, последний сбой: потерянная задача 0.0 на этапе 19.0 (TID 19, localhost): java.util.ConcurrentModificationException: KafkaConsumer небезопасно для многопоточного доступа по адресу орг..apache.Кафка.клиенты.потребитель.KafkaConsumer.приобрести(KafkaConsumer.java:1625) около орг..apache.Кафка.клиенты.потребитель.KafkaConsumer.искать(KafkaConsumer.java:1198) около орг..apache.искра.струящийся.Кафка.CachedKafkaConsumer.искать(CachedKafkaConsumer.scala:95) около орг..apache.искра.струящийся.Кафка.CachedKafkaConsumer.вам(CachedKafkaConsumer.scala:69) около org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.далее (KafkaRDD.scala:228) около org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.далее (KafkaRDD.scala:194) на scala.collection.Iterator$$ $ 11.далее (Iterator.scala:409) в scala.collection.Iterator$$anon$11.далее (Iterator.scala:409) в org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$ $ anonfun$13$ $ anonfun$применить$7.apply$mcV$sp(PairRDDFunctions.scala:1204) около org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$ $ anonfun$13$ $ anonfun$применить$7.применить (PairRDDFunctions.scala:1203) около org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$ $ anonfun$13$ $ anonfun$применить$7.применить (PairRDDFunctions.scala:1203) около org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) около org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$ $ anonfun$13.применить (PairRDDFunctions.scala:1211) около org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$ $ anonfun$13.применить (PairRDDFunctions.scala:1190) в орг.apache.искра.планировщик.ResultTask.runTask(ResultTask.scala:70) в орг.apache.искра.планировщик.Задача.запустить(Task.scala:85) в org.apache.spark.executor.Executor$TaskRunner.запустить(Executor.scala:274) около java.утиль.параллельный.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) около java.util.concurrent.ThreadPoolExecutor$работник.запустить(ThreadPoolExecutor.java:617) в java.яз..Нить.запустить(Thread.java:748)

Теперь я читаю тысячи разных сообщений, но никто, кажется, не может найти решение по этому вопросу.

Как я могу справиться с этим в моем приложении? Должен ли я изменить некоторые параметры на Kakfa (на данный момент параметр num.partition установлен в 1)?

Ниже приведен код моего приложения :

1 Ответ

Ваша проблема здесь:

Поскольку Spark создает график потоков, и вы определяете два потока здесь:

Spark попытается одновременно запустить оба этих графика, поскольку они независимы друг от друга. Поскольку Kafka использует кэшированный потребительский подход, он эффективно пытается использовать одного и того же потребителя для обоих исполнений потока.

Что вы можете сделать, это кэшировать DStream перед запуском двух запросов:

Похожие вопросы:

У меня есть следующий класс : public class AggregationController < private HashMap >messages; private HashMap counters; Boolean.

Я использую приведенный ниже код для чтения из темы Кафки и обработки данных. JavaDStream transformedMessages = messages.flatMap(record -> processData(record)) .transform(new.

Я использую Spark structured streaming с Kafka, однако, когда я пытаюсь записать поток на консоль, я получаю ошибку: Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe.

Я получил ConcurrentModificationException в строке String name = (String) keys.next();, когда я запускал код java в многопоточности. Часть функции, которая вызвала ошибку, выглядит следующим.

Мы вызываем SparkSQL job из Spark streaming. Мы получаем параллельное исключение, а потребитель Кафки-закрытую ошибку. Вот код и детали исключения: Потребительский код Кафки // Start reading.

Я использую Apache Kafka, планируя обновить свое приложение до использования Apache kafka 1.0.0. Пока я изучаю потоки Кафки, у меня возник вопрос о разнице между KafkaConsumer и.

Я включил Vimeo в тему WordPress, которую я создаю, и я получаю эти ошибки: Небезопасный JavaScript попытка доступа к кадру с URL http://themes.ibrogram.com/beta/blog/ из кадра с URL.

Я установил пакет kafka-python для Python. У меня есть Кафка продюсер и потребитель работает. Я хочу, чтобы код python читал тему Кафки и распечатывал сообщения. Мой python код ниже: import sys from.

Док Кафки дает подход примерно со следующими описаниями: Один потребитель на Thread:A простой вариант-дать каждому потоку свой собственный экземпляр consumer >. Мой код: public class.

Я учусь KafkaConsumer и у меня есть вопрос в строке 3 1.from kafka import KafkaConsumer 2.consumer = KafkaConsumer(arglist) 3.for message in consumer Я знаю. в. заявление. Этот цикл for.

Ссылка на основную публикацию
ВсеИнструменты 220 Вольт