Штурмовой болт провалился над DRPC

Я пишу топологию Storm для чтения данных из HBase с использованием DRPC. По сути, это выполняет проверку для получения данных, обогащает данные и возвращает их.

Я могу легко получить базовый пример DRPC, работающий (на основе http://storm.apache.org/releases/current/Distributed-RPC.html ). Однако, когда я вставляю код для сканирования, процесс занимает очень много времени. Через минуту я получаю следующую ошибку:

backtype.storm.generated.DRPCExecutionException

at backtype.storm.daemon.drpc $ service_handler $ reify__8688.failRequest (drpc.clj: 136) ~ [storm-core-0.10.0.2.4.2.0-258.jar: 0.10.0.2.4.2.0-258]

на backtype.storm.drpc.DRPCSpout.fail (DRPCSpout.java:241) ~ [storm-core-0.10.0.2.4.2.0-258.jar: 0.10.0.2.4.2.0-258

Через некоторое время я получаю org.apache.hadoop.hbase.client.RetriesExhaustedException. Это не всегда происходит, но очень распространено. Мое предположение, основанное на этом, является одной из двух возможностей:

Сканирование синхронизируется. Однако выполнение сканирования через HBase Shell или REST возвращает менее чем за секунду. Таблица несовместима, в результате чего определенная область отсутствует. Я запустил hbase hbck, и он показывает 0 несоответствий. Я знаю, что соединение с HBase в порядке: я добавил отладочный вывод, и болт получает результаты. Однако из-за исключения DRPCExecutionException эти результаты никогда не возвращаются по DRPC.

Я, хотя проблема была таймаутом DRPC, однако я увеличил тайм-аут DRPC много, и я получаю тот же результат за такое же количество времени. После Googling я нашел кого-то другого с той же проблемой ( [Storm] [DRPC] Request failed ), но нет никаких указаний, как это исправить.

Для справки я добавляю свой код ниже:

try (Table table = HbaseClient.connection().getTable(TableName.valueOf("EPG_URI"))) 
    {
        List<Filter> filters = new ArrayList<>();
        String startRowString = "start";
        String endRowString = "end";
        RowFilter startRow = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryPrefixComparator(startRowString.getBytes()));
        filters.add(startRow);
        RowFilter endRow = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryPrefixComparator(endRowString.getBytes()));
        filters.add(endRow);
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filters);

        Scan scan = new Scan();
        scan.addFamily("f1".getBytes());
        scan.setFilter(filterList);

        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) 
        {
            hbaseValues.add(result);
        }
    }
}

Заранее спасибо за помощь.

hbase,apache-storm,

0

Ответов: 1


0 принят

Хорошо, поэтому я не знаю, почему возникла первоначальная проблема, но я исправил ее, значительно упростив сканер:

Scan scan = new Scan(startRowString.getBytes(), endRowString.getBytes());
scan.addFamily("f1".getBytes());
ResultScanner scanner = table.getScanner(scan);            
for (Result r : scanner)
{...}

Исходя из этого, проблема, похоже, связана с HBase, а не с Storm.

HBase, апач буря,